summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-11 17:53:44 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-11 17:53:44 -0500
commit125e243cad99aa29444759e15053fd148ff0e3ba (patch)
treeb6c61fd3276cf56ea1fd5e90a7149e3d7f424e5d
parent85afe4ef04f327862460570fb0aa4c30afcf7cc7 (diff)
downloaddynamic-extension-125e243cad99aa29444759e15053fd148ff0e3ba.tar.gz
more updates
-rw-r--r--benchmarks/tail-latency/standard_latency_dist.cpp1
-rw-r--r--include/framework/DynamicExtension.h41
-rw-r--r--include/framework/reconstruction/BackgroundTieringPolicy.h3
3 files changed, 29 insertions, 16 deletions
diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp
index 7cce48c..34949d4 100644
--- a/benchmarks/tail-latency/standard_latency_dist.cpp
+++ b/benchmarks/tail-latency/standard_latency_dist.cpp
@@ -79,6 +79,7 @@ int main(int argc, char **argv) {
TIMER_START();
for (size_t j=warmup; j<data.size(); j++) {
while (!extension->insert(data[j])) {
+ fprintf(stderr, "[B] %ld %ld\n", j, extension->get_record_count());
usleep(1);
}
}
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index aa07659..c2a59ea 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -13,6 +13,7 @@
#include <cstdio>
#include <mutex>
#include <vector>
+#include <set>
#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
@@ -454,7 +455,7 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
+ fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
assert(extension->m_flush_in_progress.load());
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
@@ -488,7 +489,7 @@ private:
args->version->advance_buffer_head(new_head);
} else {
- // fprintf(stderr, "[I] Running background reconstruction\n");
+ fprintf(stderr, "[I] Running background reconstruction\n");
}
/* perform all of the reconstructions */
@@ -505,7 +506,7 @@ private:
*/
if (args->version->get_id() == INVALID_VERSION) {
args->version->set_id(extension->m_version_counter.fetch_add(1));
- // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
+ fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
}
/* advance the index to the newly finished version */
@@ -513,18 +514,26 @@ private:
/* maint reconstructions can now safely release their locks */
if (args->priority == ReconstructionPriority::MAINT) {
+ std::set<size_t> locked_levels;
for (size_t i=0; i<args->tasks.size(); i++) {
for (auto source : args->tasks[i].sources) {
- extension->m_lock_mngr.release_lock(source.level_idx);
+ locked_levels.insert(source.level_idx);
}
}
+
+ for (auto level : locked_levels) {
+ fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id());
+ extension->m_lock_mngr.release_lock(level);
+ }
}
if (args->priority == ReconstructionPriority::FLUSH) {
extension->m_flush_in_progress.store(false);
- // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
+ fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
} else {
- // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
+ fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
+ extension->print_structure();
+ fflush(stdout);
}
/* manually delete the argument object */
@@ -602,7 +611,7 @@ private:
*/
version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
size_t version_id = m_version_counter.fetch_add(1);
- // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
+ fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
@@ -629,7 +638,7 @@ private:
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
- // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
@@ -654,7 +663,7 @@ private:
auto lk = std::unique_lock(m_version_advance_mtx);
m_version_advance_cv.notify_all();
- // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
+ fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
}
StructureType *create_scratch_structure() {
@@ -712,14 +721,14 @@ private:
args->priority = ReconstructionPriority::FLUSH;
args->initial_version = INVALID_VERSION;
- // fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id());
+ fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id());
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
* freed here
*/
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
FLUSH);
- // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id());
+ fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id());
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -738,14 +747,14 @@ private:
begin_reconstruction_scheduling();
}
- // fprintf(stderr, "[I] Scheduling maintenance\n");
+ fprintf(stderr, "[I] Scheduling maintenance\n");
auto active_version = m_active_version.load();
auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
- // if (reconstructions.size() == 0) {
- // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
- // }
+ if (reconstructions.size() == 0) {
+ fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ }
for (auto &recon : reconstructions) {
/*
@@ -793,7 +802,7 @@ private:
if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) {
schedule_maint_reconstruction(true);
- // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
+ fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
return 0;
}
diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h
index 5a82695..a1bbddc 100644
--- a/include/framework/reconstruction/BackgroundTieringPolicy.h
+++ b/include/framework/reconstruction/BackgroundTieringPolicy.h
@@ -45,6 +45,7 @@ public:
for (level_index i = target_level; i > source_level; i--) {
if (lock_mngr.take_lock(i-1)) {
+ fprintf(stderr, "[I] Taking lock on %ld (%ld)\n", i-1, version->get_id());
ReconstructionVector recon;
size_t target_reccnt =
(i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
@@ -56,6 +57,8 @@ public:
recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
reconstructions.push_back(recon);
+ } else {
+ fprintf(stderr, "[I] Failed to get lock on %ld (%ld)\n", i-1, version->get_id());
}
}