summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h92
1 files changed, 48 insertions, 44 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 1886234..c35bb93 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -15,7 +15,6 @@
#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
-#include "framework/scheduling/FIFOScheduler.h"
#include "framework/scheduling/SerialScheduler.h"
#include "framework/structure/ExtensionStructure.h"
@@ -28,7 +27,7 @@ namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
DeletePolicy D = DeletePolicy::TAGGING,
- SchedulerInterface SchedType = de::FIFOScheduler>
+ SchedulerInterface SchedType = de::SerialScheduler>
class DynamicExtension {
private:
/* convenience typedefs for commonly used types within the class */
@@ -76,10 +75,12 @@ public:
* performing compactions and flushes, etc.
*/
DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark,
- size_t buffer_high_watermark, size_t memory_budget = 0,
+ size_t buffer_high_watermark = 0, size_t memory_budget = 0,
size_t thread_cnt = 16)
: m_max_delete_prop(1), m_sched(memory_budget, thread_cnt),
- m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)),
+ m_buffer(new Buffer(buffer_low_watermark, (buffer_high_watermark == 0)
+ ? buffer_low_watermark
+ : buffer_high_watermark)),
m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0),
m_recon_policy(recon_policy) {
@@ -165,7 +166,7 @@ public:
auto view = m_buffer->get_buffer_view();
auto epoch = get_active_epoch();
- if (epoch->get_structure()->tagged_delete(rec)) {
+ if (epoch->get_mutable_structure()->tagged_delete(rec)) {
end_job(epoch);
return 1;
}
@@ -303,13 +304,14 @@ public:
auto epoch = get_active_epoch();
auto vers = epoch->get_structure();
- std::vector<ShardType *> shards;
-
- if (vers->get_levels().size() > 0) {
- for (int i = vers->get_levels().size() - 1; i >= 0; i--) {
- if (vers->get_levels()[i] &&
- vers->get_levels()[i]->get_record_count() > 0) {
- shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
+ std::vector<const ShardType *> shards;
+
+ if (vers->get_level_vector().size() > 0) {
+ for (int i = vers->get_level_vector().size() - 1; i >= 0; i--) {
+ if (vers->get_level_vector()[i] &&
+ vers->get_level_vector()[i]->get_record_count() > 0) {
+ shards.emplace_back(
+ vers->get_level_vector()[i]->get_combined_shard());
}
}
}
@@ -358,7 +360,8 @@ public:
*/
bool validate_tombstone_proportion() {
auto epoch = get_active_epoch();
- auto t = epoch->get_structure()->validate_tombstone_proportion();
+ auto t = epoch->get_structure()->validate_tombstone_proportion(
+ m_max_delete_prop);
end_job(epoch);
return t;
}
@@ -370,7 +373,6 @@ public:
void print_scheduler_statistics() const { m_sched.print_statistics(); }
private:
- ReconPolicyType const *m_recon_policy;
double m_max_delete_prop;
SchedType m_sched;
@@ -380,6 +382,8 @@ private:
std::atomic<int> m_next_core;
std::atomic<size_t> m_epoch_cnt;
+ ReconPolicyType const *m_recon_policy;
+
alignas(64) std::atomic<bool> m_reconstruction_scheduled;
std::atomic<epoch_ptr> m_next_epoch;
@@ -547,39 +551,36 @@ private:
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
((DynamicExtension *)args->extension)->SetThreadAffinity();
- Structure *vers = args->epoch->get_structure();
+ Structure *vers = args->epoch->get_mutable_structure();
- for (size_t i=0; i<args->tasks.size(); i++) {
- vers->perform_reconstruction(args->tasks[i]);
- }
+ ReconstructionTask flush_task;
+ flush_task.type = ReconstructionType::Invalid;
- /*
- * we'll grab the buffer AFTER doing the internal reconstruction, so we
- * can flush as many records as possible in one go. The reconstruction
- * was done so as to make room for the full buffer anyway, so there's
- * no real benefit to doing this first.
- */
- auto buffer_view = args->epoch->get_buffer();
- size_t new_head = buffer_view.get_tail();
+ for (size_t i = 0; i < args->tasks.size(); i++) {
+ if (args->tasks[i].sources.size() > 0 &&
+ args->tasks[i].sources[0] == buffer_shid) {
+ flush_task = args->tasks[i];
+ continue;
+ }
- /*
- * if performing a compaction, don't flush the buffer, as
- * there is no guarantee that any necessary reconstructions
- * will free sufficient space in L0 to support a flush
- */
- if (!args->compaction) {
- vers->flush_buffer(std::move(buffer_view));
+ vers->perform_reconstruction(args->tasks[i]);
}
- args->result.set_value(true);
+ if (flush_task.type != ReconstructionType::Invalid) {
+ /*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we
+ * can flush as many records as possible in one go. The reconstruction
+ * was done so as to make room for the full buffer anyway, so there's
+ * no real benefit to doing this first.
+ */
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
- /*
- * Compactions occur on an epoch _before_ it becomes active,
- * and as a result the active epoch should _not_ be advanced as
- * part of a compaction
- */
- if (!args->compaction) {
+ vers->perform_flush(flush_task, std::move(buffer_view));
+ args->result.set_value(true);
((DynamicExtension *)args->extension)->advance_epoch(new_head);
+ } else {
+ args->result.set_value(true);
}
((DynamicExtension *)args->extension)
@@ -660,10 +661,12 @@ private:
args->tasks = m_recon_policy->get_reconstruction_tasks(
epoch, m_buffer->get_high_watermark());
args->extension = this;
- args->compaction = false;
- /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed
- * here */
+ args->tasks.add_reconstruction(m_recon_policy->get_flush_task(epoch));
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be freed
+ * here
+ */
m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
}
@@ -691,7 +694,8 @@ private:
return m_buffer->append(rec, ts);
}
-#ifdef _GNU_SOURCE
+//#ifdef _GNU_SOURCE
+#if 0
void SetThreadAffinity() {
if constexpr (std::same_as<SchedType, SerialScheduler>) {
return;