summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-04-10 13:57:04 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-04-10 13:57:04 -0400
commitd129ac81281022107ebccc5197ee126dd1b8cf2a (patch)
treec44fad16e69d632ba89af5868ee033e20e818f01
parent4e0e54776dd5d148d2169e2b889e753ff2dd88f7 (diff)
downloaddynamic-extension-d129ac81281022107ebccc5197ee126dd1b8cf2a.tar.gz
DynamicExtension.h: Added the insertion rate limiter
The exact determination of *how* to use this mechanism will be contigent on some of the math I'm working on at the moment.
-rw-r--r--include/framework/DynamicExtension.h150
1 files changed, 98 insertions, 52 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index a65498c..dd6917c 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -11,14 +11,15 @@
#include <atomic>
#include <cstdio>
+#include <gsl/gsl_rng.h>
#include <mutex>
-#include <vector>
#include <set>
+#include <vector>
#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
-#include "framework/scheduling/SerialScheduler.h"
#include "framework/scheduling/LockManager.h"
+#include "framework/scheduling/SerialScheduler.h"
#include "framework/scheduling/Task.h"
#include "framework/structure/ExtensionStructure.h"
@@ -83,16 +84,17 @@ public:
* performing compactions and flushes, etc.
*/
DynamicExtension(ConfType &&config) : m_config(std::move(config)) {
- m_buffer =
- std::make_unique<BufferType>(m_config.buffer_flush_trigger, m_config.buffer_size);
+ m_buffer = std::make_unique<BufferType>(m_config.buffer_flush_trigger,
+ m_config.buffer_size);
m_sched = std::make_unique<SchedType>(m_config.maximum_memory_usage,
m_config.maximum_threads);
- m_active_version.store(
- std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
+ m_active_version.store(std::make_shared<VersionType>(
+ INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
m_version_counter = INITIAL_VERSION;
m_preempt_version = INVALID_VERSION;
+ m_insertion_rate.store(1.0);
assert(m_config.recon_policy);
}
@@ -122,7 +124,9 @@ public:
* @return 1 on success, 0 on failure (in which case the insert should
* be retried)
*/
- int insert(const RecordType &rec) { return internal_append(rec, false); }
+ int insert(const RecordType &rec, gsl_rng *rng = nullptr) {
+ return internal_append(rec, false, rng);
+ }
/**
* Erases a record from the index, according to the DeletePolicy
@@ -380,13 +384,13 @@ public:
* Writes a schematic view of the currently active structure to
* stdout. Each level is on its own line, and each shard is represented.
*/
- void print_structure() {
- auto ver = get_active_version();
- auto bv = ver->get_buffer();
+ void print_structure() {
+ auto ver = get_active_version();
+ auto bv = ver->get_buffer();
- fprintf(stdout, "[B]:\t(%ld)\n", bv.get_record_count());
- ver->get_structure()->print_structure();
- }
+ fprintf(stdout, "[B]:\t(%ld)\n", bv.get_record_count());
+ ver->get_structure()->print_structure();
+ }
private:
ConfType m_config;
@@ -410,9 +414,13 @@ private:
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
- bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, size_t version) {
+ alignas(64) std::atomic<double> m_insertion_rate;
+
+ bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args,
+ size_t version) {
if (version <= m_preempt_version.load()) {
- // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n", version);
+ // fprintf(stderr, "[I] Preempted query on version %ld, restarting...\n",
+ // version);
m_sched->schedule_job(async_query, 0, (void *)args, QUERY);
return true;
}
@@ -422,9 +430,10 @@ private:
void preempt_queries() {
size_t vers = m_active_version.load()->get_id() - 1;
- if (vers != m_preempt_version.load()){
+ if (vers != m_preempt_version.load()) {
m_preempt_version.store(vers);
- // fprintf(stderr, "[I] Initiating query preemption on version %ld\n", vers);
+ // fprintf(stderr, "[I] Initiating query preemption on version %ld\n",
+ // vers);
}
}
@@ -479,15 +488,17 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
- // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", args->version->get_id(), recon_id);
+ // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
+ // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
+ // args->version->get_id(), recon_id);
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
assert(buffview.get_tail() != buffview.get_head());
new_head = buffview.get_tail();
- // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", buffview.get_head(), recon_id);
+ // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n",
+ // buffview.get_head(), recon_id);
reconstruction_results<ShardType> flush_recon;
flush_recon.target_level = 0;
@@ -505,7 +516,9 @@ private:
* shards in L0 to use), or use it (by using the "all_shards_idx"
* shard id).
*/
- args->version->get_mutable_structure()->append_shard(flush_recon.new_shard, args->version->get_id(), flush_recon.target_level);
+ args->version->get_mutable_structure()->append_shard(
+ flush_recon.new_shard, args->version->get_id(),
+ flush_recon.target_level);
/* advance the buffer head for a flush */
bool success = false;
@@ -515,28 +528,33 @@ private:
if (!success) {
failure_cnt++;
usleep(1);
- // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", args->version->get_id(), recon_id);
+ // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n",
+ // args->version->get_id(), recon_id);
if (failure_cnt >=
extension->m_config.buffer_flush_query_preemption_trigger) {
extension->preempt_queries();
if (failure_cnt > 500000) {
- // fprintf(stderr, "[C] Critical failure. Hung on version: %ld (%ld)\n", extension->m_buffer->debug_get_old_head(), recon_id);
+ // fprintf(stderr, "[C] Critical failure. Hung on version: %ld
+ // (%ld)\n", extension->m_buffer->debug_get_old_head(), recon_id);
}
}
}
}
- // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", new_head, recon_id);
+ // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n",
+ // new_head, recon_id);
} else {
- // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", recon_id);
+ // fprintf(stderr, "[I] Running background reconstruction (%ld)\n",
+ // recon_id);
}
/* perform all of the reconstructions */
auto structure = args->version->get_structure();
assert(structure);
- // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", structure->get_level_vector()[0]->get_shard_count(), recon_id);
+ // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n",
+ // structure->get_level_vector()[0]->get_shard_count(), recon_id);
for (size_t i = 0; i < args->tasks.size(); i++) {
reconstructions.emplace_back(
@@ -579,7 +597,8 @@ private:
for (auto recon : reconstructions) {
auto grow = args->version->get_mutable_structure()->append_shard(
recon.new_shard, args->version->get_id(), recon.target_level);
- args->version->get_mutable_structure()->delete_shards(recon.source_shards);
+ args->version->get_mutable_structure()->delete_shards(
+ recon.source_shards);
if (grow) {
extension->m_lock_mngr.add_lock();
}
@@ -587,20 +606,24 @@ private:
size_t new_reccnt = args->version->get_structure()->get_record_count();
- // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", args->version->get_structure()->get_level_vector()[0]->get_shard_count(), recon_id);
+ // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n",
+ // args->version->get_structure()->get_level_vector()[0]->get_shard_count(),
+ // recon_id);
/* for maintenance reconstructions, advance the buffer head to match the
* currently active version */
if (args->priority == ReconstructionPriority::MAINT) {
args->version->set_buffer(extension->m_buffer.get(),
active_version->get_head());
- // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", active_version->get_head(), recon_id);
+ // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n",
+ // active_version->get_head(), recon_id);
if (new_reccnt != cur_reccnt) {
fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id);
}
}
- // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, cur_reccnt, new_reccnt, recon_id);
+ // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt,
+ // cur_reccnt, new_reccnt, recon_id);
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
@@ -615,7 +638,8 @@ private:
}
for (auto level : locked_levels) {
- // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, recon_id);
+ // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level,
+ // recon_id);
extension->m_lock_mngr.release_lock(level, args->version->get_id());
}
}
@@ -625,7 +649,8 @@ private:
extension->m_lock_mngr.release_buffer_lock();
}
- // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", args->version->get_id(), recon_id);
+ // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n",
+ // args->version->get_id(), recon_id);
/* manually delete the argument object */
delete args;
@@ -717,8 +742,9 @@ private:
size_t version_id = m_version_counter.fetch_add(1) + 1;
// fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
- std::shared_ptr<VersionType> new_version =
- std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
+ std::shared_ptr<VersionType> new_version = std::make_shared<VersionType>(
+ version_id, std::move(structure), m_buffer.get(),
+ active_version->get_buffer().get_head());
return new_version;
}
@@ -738,7 +764,8 @@ private:
return new_version;
}
- void install_new_version(version_ptr new_version, size_t old_active_version_id) {
+ void install_new_version(version_ptr new_version,
+ size_t old_active_version_id) {
assert(new_version->valid());
// fprintf(stderr, "\t[I] Installing version %ld\n", new_version->get_id());
@@ -754,7 +781,7 @@ private:
*/
m_active_version.store(new_version);
m_version_advance_cv.notify_all();
-
+
// fprintf(stderr, "\t[I] Installed version %ld\n", new_version->get_id());
}
@@ -781,13 +808,13 @@ private:
}
/*
- * Double check that we actually need to flush. I was seeing some
- * flushes running on empty buffers--somehow it seems like a new
- * flush was getting scheduled immediately after another one finished,
- * when it wasn't necessary. This should prevent that from happening.
- *
- * A bit of a kludge, but it *should* work
- */
+ * Double check that we actually need to flush. I was seeing some
+ * flushes running on empty buffers--somehow it seems like a new
+ * flush was getting scheduled immediately after another one finished,
+ * when it wasn't necessary. This should prevent that from happening.
+ *
+ * A bit of a kludge, but it *should* work
+ */
if (!m_buffer->is_at_low_watermark()) {
m_lock_mngr.release_buffer_lock();
end_reconstruction_scheduling();
@@ -797,7 +824,8 @@ private:
auto active_version = m_active_version.load();
auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version = create_version_flush(std::unique_ptr<StructureType>(active_version->get_structure()->copy()));
+ args->version = create_version_flush(std::unique_ptr<StructureType>(
+ active_version->get_structure()->copy()));
args->tasks = m_config.recon_policy->get_flush_tasks(active_version.get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
@@ -810,8 +838,9 @@ private:
* freed here
*/
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
- FLUSH);
- // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", args->version->get_id());
+ FLUSH);
+ // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n",
+ // args->version->get_id());
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -833,7 +862,8 @@ private:
// fprintf(stderr, "[I] Scheduling maintenance\n");
auto active_version = m_active_version.load();
- auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
+ auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(
+ active_version.get(), m_lock_mngr);
for (auto &recon : reconstructions) {
/*
@@ -841,13 +871,14 @@ private:
* freed here
*/
auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy()));
- args->tasks = std::move(recon);
+ args->version = create_version_maint(std::unique_ptr<StructureType>(
+ active_version->get_structure()->copy()));
+ args->tasks = std::move(recon);
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
args->initial_version = active_version->get_id();
- m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args,
- RECONSTRUCTION);
+ m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(),
+ args, RECONSTRUCTION);
}
if (take_reconstruction_lock) {
@@ -868,11 +899,26 @@ private:
return result;
}
- int internal_append(const RecordType &rec, bool ts) {
+ int internal_append(const RecordType &rec, bool ts, gsl_rng *rng = nullptr) {
if (m_buffer->is_at_low_watermark() && !m_lock_mngr.is_buffer_locked()) {
schedule_flush();
}
+ /*
+ * Insertion rate limiting is handled by Bernoulli Sampling to determine
+ * whether a given insert succeeds or not. If an insert is allowed to
+ * happen, a buffer append will be attempted (this might still fail,
+ * though). Otherwise, the insert will not be attempted and should
+ * be retried. If the insertion rate is unity, then no probabilistic
+ * filtering will occur.
+ */
+ if (m_insertion_rate.load() < 1 && rng) {
+ auto p = gsl_rng_uniform(rng);
+ if (p > m_insertion_rate.load()) {
+ return false;
+ }
+ }
+
/* this will fail if the HWM is reached and return 0 */
return m_buffer->append(rec, ts);
}