diff options
| author | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 15:12:13 -0500 |
|---|---|---|
| committer | Douglas B. Rumbaugh <doug@douglasrumbaugh.com> | 2024-12-22 15:12:13 -0500 |
| commit | ba65c8976f54d4da2467074235a12f5be0bd5ebc (patch) | |
| tree | 955d5995f211d8a7a24f7b106912773db5e3a5ba /include/framework/DynamicExtension.h | |
| parent | 5617bed5257506d3dfda8537b16f44b3e40f1b42 (diff) | |
| download | dynamic-extension-ba65c8976f54d4da2467074235a12f5be0bd5ebc.tar.gz | |
Continued development
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 92 |
1 files changed, 48 insertions, 44 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 1886234..c35bb93 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -15,7 +15,6 @@ #include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" -#include "framework/scheduling/FIFOScheduler.h" #include "framework/scheduling/SerialScheduler.h" #include "framework/structure/ExtensionStructure.h" @@ -28,7 +27,7 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, DeletePolicy D = DeletePolicy::TAGGING, - SchedulerInterface SchedType = de::FIFOScheduler> + SchedulerInterface SchedType = de::SerialScheduler> class DynamicExtension { private: /* convenience typedefs for commonly used types within the class */ @@ -76,10 +75,12 @@ public: * performing compactions and flushes, etc. */ DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark, - size_t buffer_high_watermark, size_t memory_budget = 0, + size_t buffer_high_watermark = 0, size_t memory_budget = 0, size_t thread_cnt = 16) : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt), - m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)), + m_buffer(new Buffer(buffer_low_watermark, (buffer_high_watermark == 0) + ? buffer_low_watermark + : buffer_high_watermark)), m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0), m_recon_policy(recon_policy) { @@ -165,7 +166,7 @@ public: auto view = m_buffer->get_buffer_view(); auto epoch = get_active_epoch(); - if (epoch->get_structure()->tagged_delete(rec)) { + if (epoch->get_mutable_structure()->tagged_delete(rec)) { end_job(epoch); return 1; } @@ -303,13 +304,14 @@ public: auto epoch = get_active_epoch(); auto vers = epoch->get_structure(); - std::vector<ShardType *> shards; - - if (vers->get_levels().size() > 0) { - for (int i = vers->get_levels().size() - 1; i >= 0; i--) { - if (vers->get_levels()[i] && - vers->get_levels()[i]->get_record_count() > 0) { - shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); + std::vector<const ShardType *> shards; + + if (vers->get_level_vector().size() > 0) { + for (int i = vers->get_level_vector().size() - 1; i >= 0; i--) { + if (vers->get_level_vector()[i] && + vers->get_level_vector()[i]->get_record_count() > 0) { + shards.emplace_back( + vers->get_level_vector()[i]->get_combined_shard()); } } } @@ -358,7 +360,8 @@ public: */ bool validate_tombstone_proportion() { auto epoch = get_active_epoch(); - auto t = epoch->get_structure()->validate_tombstone_proportion(); + auto t = epoch->get_structure()->validate_tombstone_proportion( + m_max_delete_prop); end_job(epoch); return t; } @@ -370,7 +373,6 @@ public: void print_scheduler_statistics() const { m_sched.print_statistics(); } private: - ReconPolicyType const *m_recon_policy; double m_max_delete_prop; SchedType m_sched; @@ -380,6 +382,8 @@ private: std::atomic<int> m_next_core; std::atomic<size_t> m_epoch_cnt; + ReconPolicyType const *m_recon_policy; + alignas(64) std::atomic<bool> m_reconstruction_scheduled; std::atomic<epoch_ptr> m_next_epoch; @@ -547,39 +551,36 @@ private: auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments; ((DynamicExtension *)args->extension)->SetThreadAffinity(); - Structure *vers = args->epoch->get_structure(); + Structure *vers = args->epoch->get_mutable_structure(); - for (size_t i=0; i<args->tasks.size(); i++) { - vers->perform_reconstruction(args->tasks[i]); - } + ReconstructionTask flush_task; + flush_task.type = ReconstructionType::Invalid; - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we - * can flush as many records as possible in one go. The reconstruction - * was done so as to make room for the full buffer anyway, so there's - * no real benefit to doing this first. - */ - auto buffer_view = args->epoch->get_buffer(); - size_t new_head = buffer_view.get_tail(); + for (size_t i = 0; i < args->tasks.size(); i++) { + if (args->tasks[i].sources.size() > 0 && + args->tasks[i].sources[0] == buffer_shid) { + flush_task = args->tasks[i]; + continue; + } - /* - * if performing a compaction, don't flush the buffer, as - * there is no guarantee that any necessary reconstructions - * will free sufficient space in L0 to support a flush - */ - if (!args->compaction) { - vers->flush_buffer(std::move(buffer_view)); + vers->perform_reconstruction(args->tasks[i]); } - args->result.set_value(true); + if (flush_task.type != ReconstructionType::Invalid) { + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. + */ + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); - /* - * Compactions occur on an epoch _before_ it becomes active, - * and as a result the active epoch should _not_ be advanced as - * part of a compaction - */ - if (!args->compaction) { + vers->perform_flush(flush_task, std::move(buffer_view)); + args->result.set_value(true); ((DynamicExtension *)args->extension)->advance_epoch(new_head); + } else { + args->result.set_value(true); } ((DynamicExtension *)args->extension) @@ -660,10 +661,12 @@ private: args->tasks = m_recon_policy->get_reconstruction_tasks( epoch, m_buffer->get_high_watermark()); args->extension = this; - args->compaction = false; - /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed - * here */ + args->tasks.add_reconstruction(m_recon_policy->get_flush_task(epoch)); + /* + * NOTE: args is deleted by the reconstruction job, so shouldn't be freed + * here + */ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } @@ -691,7 +694,8 @@ private: return m_buffer->append(rec, ts); } -#ifdef _GNU_SOURCE +//#ifdef _GNU_SOURCE +#if 0 void SetThreadAffinity() { if constexpr (std::same_as<SchedType, SerialScheduler>) { return; |