summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h204
1 files changed, 133 insertions, 71 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 8cef4a1..762029e 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -1,7 +1,7 @@
/*
* include/framework/DynamicExtension.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
@@ -161,7 +161,6 @@ public:
static_assert(std::same_as<SchedType, SerialScheduler>,
"Tagging is only supported in single-threaded operation");
- auto view = m_buffer->get_buffer_view();
auto version = get_active_version();
if (version->get_mutable_structure()->tagged_delete(rec)) {
return 1;
@@ -172,7 +171,7 @@ public:
* probably has the lowest probability of having the record,
* so we'll check it last.
*/
- return view.delete_record(rec);
+ return version->get_buffer().delete_record(rec);
}
/*
@@ -256,9 +255,8 @@ public:
* ShardType::get_memory_usage) and the buffer by the index.
*/
size_t get_memory_usage() {
- auto version = get_active_version();
auto t = m_buffer->get_memory_usage() +
- version->get_structure()->get_memory_usage();
+ get_active_version()->get_structure()->get_memory_usage();
return t;
}
@@ -288,8 +286,7 @@ public:
* the index. Ownership of this object is transfered to the
* caller.
*/
- // FIXME: switch this over to std::unique_ptr
- ShardType *
+ std::unique_ptr<ShardType>
create_static_structure(bool await_reconstruction_completion = false) {
if (await_reconstruction_completion) {
await_version();
@@ -322,7 +319,7 @@ public:
}
}
- ShardType *flattened = new ShardType(shards);
+ auto flattened = std::make_unique<ShardType>(shards);
for (auto shard : shards) {
delete shard;
@@ -348,7 +345,7 @@ public:
* of the counter
*/
if (vid == INVALID_VERSION) {
- vid = m_version_counter.load() - 1;
+ vid = m_version_counter.load();
}
/* versions signal on m_version_advance_cv when they activate */
@@ -384,7 +381,11 @@ public:
* stdout. Each level is on its own line, and each shard is represented.
*/
void print_structure() {
- get_active_version()->get_structure()->print_structure();
+ auto ver = get_active_version();
+ auto bv = ver->get_buffer();
+
+ fprintf(stdout, "[B]:\t(%ld)\n", bv.get_record_count());
+ ver->get_structure()->print_structure();
}
private:
@@ -397,15 +398,15 @@ private:
std::atomic<int> m_next_core;
/* versioning + concurrency variables */
- std::atomic<size_t> m_version_counter;
- std::atomic<std::shared_ptr<VersionType>> m_active_version;
+ alignas(64) std::atomic<size_t> m_version_counter;
+ alignas(64) std::atomic<std::shared_ptr<VersionType>> m_active_version;
- std::condition_variable m_version_advance_cv;
- std::mutex m_version_advance_mtx;
+ alignas(64) std::condition_variable m_version_advance_cv;
+ alignas(64) std::mutex m_version_advance_mtx;
- LockManager m_lock_mngr;
+ alignas(64) LockManager m_lock_mngr;
- std::atomic<size_t> m_preempt_version;
+ alignas(64) std::atomic<size_t> m_preempt_version;
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
@@ -460,7 +461,9 @@ private:
auto extension = (DynamicExtension *)args->extension;
extension->SetThreadAffinity();
-
+ // static std::atomic<size_t> cnt = 0;
+ // size_t recon_id = cnt.fetch_add(1);
+
size_t new_head = 0;
std::vector<reconstruction_results<ShardType>> reconstructions;
@@ -476,21 +479,24 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
+ // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
+ // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
+ // args->version->get_id(), recon_id);
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
assert(buffview.get_tail() != buffview.get_head());
new_head = buffview.get_tail();
+ // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n",
+ // buffview.get_head(), recon_id);
+
reconstruction_results<ShardType> flush_recon;
flush_recon.target_level = 0;
flush_recon.new_shard = std::make_shared<ShardType>(std::move(buffview));
reconstructions.push_back(flush_recon);
- // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head);
-
/* advance the buffer head for a flush */
bool success = false;
size_t failure_cnt = 0;
@@ -499,24 +505,39 @@ private:
if (!success) {
failure_cnt++;
usleep(1);
- // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id());
+ // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n",
+ // args->version->get_id(), recon_id);
- if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) {
+ if (failure_cnt >=
+ extension->m_config.buffer_flush_query_preemption_trigger) {
extension->preempt_queries();
+
+ if (failure_cnt > 500000) {
+ // fprintf(stderr,
+ // "[C] Critical failure. Hung on version: %ld (%ld)\n",
+ // extension->m_buffer->debug_get_old_head(), recon_id);
+ }
}
}
}
+ // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n",
+ // new_head, recon_id);
} else {
- // fprintf(stderr, "[I] Running background reconstruction\n");
+ // fprintf(stderr, "[I] Running background reconstruction (%ld)\n",
+ // recon_id);
}
/* perform all of the reconstructions */
auto structure = args->version->get_structure();
assert(structure);
-
+
+ // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n",
+ // structure->get_level_vector()[0]->get_shard_count(), recon_id);
+
for (size_t i = 0; i < args->tasks.size(); i++) {
- reconstructions.emplace_back(structure->perform_reconstruction(args->tasks[i]));
+ reconstructions.emplace_back(
+ structure->perform_reconstruction(args->tasks[i]));
}
/*
@@ -524,51 +545,90 @@ private:
* reconstruction isn't a flush), generate one.
*/
if (args->version->get_id() == INVALID_VERSION) {
- args->version->set_id(extension->m_version_counter.fetch_add(1));
- // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
+ assert(args->priority == ReconstructionPriority::MAINT);
+ args->version->set_id(extension->m_version_counter.fetch_add(1) + 1);
+ // fprintf(stderr, "\t[I] Reconstruction version assigned %ld (%ld)\n",
+ // args->version->get_id(), recon_id);
+ } else {
+ assert(args->priority == ReconstructionPriority::FLUSH);
}
-
+
/* wait for our opportunity to install the updates */
extension->await_version(args->version->get_id() - 1);
- /* get a fresh copy of the structure and apply our updates */
- args->version->set_structure(std::move(std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy())));
+ // size_t old_reccnt = args->version->get_structure()->get_record_count();
+
+ /*
+ * this version *should* have an ID one less than the version we are
+ * currently constructing, and should be fully stable (i.e., only the
+ * buffer tail can change)
+ */
+ auto active_version = extension->get_active_version();
+ assert(active_version->get_id() == args->version->get_id() - 1);
+
+ /* get a fresh copy of the structure from the current version */
+ args->version->set_structure(std::move(std::unique_ptr<StructureType>(
+ active_version->get_structure()->copy())));
+
+ // size_t cur_reccnt = args->version->get_structure()->get_record_count();
+ /* apply our updates to the copied structure (adding/removing shards) */
for (auto recon : reconstructions) {
- auto grow = args->version->get_mutable_structure()->append_shard(recon.new_shard, args->version->get_id(), recon.target_level);
+ auto grow = args->version->get_mutable_structure()->append_shard(
+ recon.new_shard, args->version->get_id(), recon.target_level);
args->version->get_mutable_structure()->delete_shards(recon.source_shards);
if (grow) {
extension->m_lock_mngr.add_lock();
}
}
- /* grab the latest buffer head */
+ // size_t new_reccnt = args->version->get_structure()->get_record_count();
+
+ // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n",
+ // args->version->get_structure()->get_level_vector()[0]->get_shard_count(),
+ // recon_id);
+
+ /* for maintenance reconstructions, advance the buffer head to match the
+ * currently active version */
if (args->priority == ReconstructionPriority::MAINT) {
- args->version->set_head(extension->get_active_version()->get_head());
+ args->version->set_buffer(extension->m_buffer.get(),
+ active_version->get_head());
+ // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n",
+ // active_version->get_head(), recon_id);
+ // if (new_reccnt != cur_reccnt) {
+ // fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id);
+ // }
}
+ // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt,
+ // cur_reccnt, new_reccnt, recon_id);
+
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
/* maint reconstructions can now safely release their locks */
if (args->priority == ReconstructionPriority::MAINT) {
std::set<size_t> locked_levels;
- for (size_t i=0; i<args->tasks.size(); i++) {
+ for (size_t i = 0; i < args->tasks.size(); i++) {
for (auto source : args->tasks[i].sources) {
locked_levels.insert(source.level_idx);
}
}
for (auto level : locked_levels) {
- // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id());
- extension->m_lock_mngr.release_lock(level);
+ // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, recon_id);
+ extension->m_lock_mngr.release_lock(level, args->version->get_id());
}
}
-
+
if (args->priority == ReconstructionPriority::FLUSH) {
+ // fprintf(stderr, "\t[I] releasing lock on buffer (%ld)\n", recon_id);
extension->m_lock_mngr.release_buffer_lock();
}
+ // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n",
+ // args->version->get_id(), recon_id);
+
/* manually delete the argument object */
delete args;
}
@@ -596,6 +656,19 @@ private:
/* execute the local/buffer queries and combine the results into output */
QueryResult output;
do {
+ /*
+ * for query preemption--check if the query should be restarted
+ * to prevent blocking buffer flushes for too long
+ */
+ if (args->extension->restart_query(args, version->get_id())) {
+ /* clean up memory allocated for temporary query objects */
+ delete buffer_query;
+ for (size_t i = 0; i < local_queries.size(); i++) {
+ delete local_queries[i];
+ }
+ return;
+ }
+
std::vector<LocalResult> query_results(shards.size() + 1);
for (size_t i = 0; i < query_results.size(); i++) {
if (i == 0) { /* execute buffer query */
@@ -610,19 +683,6 @@ private:
if (query_results[i].size() > 0)
break;
}
-
- /*
- * for query preemption--check if the query should be restarted
- * to prevent blocking buffer flushes for too long
- */
- if (args->extension->restart_query(args, version->get_id())) {
- /* clean up memory allocated for temporary query objects */
- delete buffer_query;
- for (size_t i = 0; i < local_queries.size(); i++) {
- delete local_queries[i];
- }
- return;
- }
}
/*
@@ -656,7 +716,7 @@ private:
* early to minimize activation blocking.
*/
version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
- size_t version_id = m_version_counter.fetch_add(1);
+ size_t version_id = m_version_counter.fetch_add(1) + 1;
// fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
@@ -674,17 +734,16 @@ private:
*/
version_ptr create_version_maint(std::unique_ptr<StructureType> structure) {
auto active_version = get_active_version();
- version_ptr new_version =
- std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
+ version_ptr new_version = std::make_shared<VersionType>(
+ INVALID_VERSION, std::move(structure), nullptr, 0);
return new_version;
}
void install_new_version(version_ptr new_version, size_t old_active_version_id) {
- assert(new_version->get_structure());
- assert(new_version->get_id() != INVALID_VERSION);
+ assert(new_version->valid());
- // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ // fprintf(stderr, "\t[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
@@ -698,11 +757,7 @@ private:
m_active_version.store(new_version);
m_version_advance_cv.notify_all();
- // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
- }
-
- StructureType *create_scratch_structure() {
- return get_active_version()->get_structure()->copy();
+ // fprintf(stderr, "\t[I] Installed version %ld\n", new_version->get_id());
}
void begin_reconstruction_scheduling() {
@@ -727,6 +782,20 @@ private:
return;
}
+ /*
+ * Double check that we actually need to flush. I was seeing some
+ * flushes running on empty buffers--somehow it seems like a new
+ * flush was getting scheduled immediately after another one finished,
+ * when it wasn't necessary. This should prevent that from happening.
+ *
+ * A bit of a kludge, but it *should* work
+ */
+ if (!m_buffer->is_at_low_watermark()) {
+ m_lock_mngr.release_buffer_lock();
+ end_reconstruction_scheduling();
+ return;
+ }
+
auto active_version = m_active_version.load();
auto *args = new ReconstructionArgs<ShardType, QueryType>();
@@ -768,10 +837,6 @@ private:
auto active_version = m_active_version.load();
auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
- if (reconstructions.size() == 0) {
- // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
- }
-
for (auto &recon : reconstructions) {
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
@@ -783,7 +848,7 @@ private:
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
args->initial_version = active_version->get_id();
- m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
+ m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args,
RECONSTRUCTION);
}
@@ -806,11 +871,8 @@ private:
}
int internal_append(const RecordType &rec, bool ts) {
- if (m_buffer->is_at_low_watermark()) {
- auto old = false;
- if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) {
- schedule_flush();
- }
+ if (m_buffer->is_at_low_watermark() && !m_lock_mngr.is_buffer_locked()) {
+ schedule_flush();
}
/* this will fail if the HWM is reached and return 0 */