summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-04-07 14:37:19 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-04-07 14:37:19 -0400
commita9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1 (patch)
treec3c404223d852b64a57d5265221c4a2d05d0af73 /include/framework
parent6bdcf74ad91e0efaa8c2e4339f5085fde8a7982b (diff)
downloaddynamic-extension-a9ee3f2f7f557adbe5448e4f4fbf71c23fe808b1.tar.gz
Implemented the legacy policies and fixed a few bugs
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h50
-rw-r--r--include/framework/reconstruction/BSMPolicy.h38
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h62
-rw-r--r--include/framework/scheduling/FIFOScheduler.h3
-rw-r--r--include/framework/structure/ExtensionStructure.h21
-rw-r--r--include/framework/structure/InternalLevel.h16
6 files changed, 109 insertions, 81 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 9d76813..a65498c 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -479,17 +479,15 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
- // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
- // args->version->get_id(), recon_id);
+ // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
+ // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", args->version->get_id(), recon_id);
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
assert(buffview.get_tail() != buffview.get_head());
new_head = buffview.get_tail();
- // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n",
- // buffview.get_head(), recon_id);
+ // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n", buffview.get_head(), recon_id);
reconstruction_results<ShardType> flush_recon;
flush_recon.target_level = 0;
@@ -497,6 +495,18 @@ private:
reconstructions.push_back(flush_recon);
+ /*
+ * Eager policies need access to the flushed shard immediately, so
+ * we add it to the current structure. This gives such policies
+ * access to it within their own reconstructions later.
+ *
+ * This is necessary for Leveling and BSM specifically. Tiering-based
+ * policies can ignore this shard (by explicitly specifying the
+ * shards in L0 to use), or use it (by using the "all_shards_idx"
+ * shard id).
+ */
+ args->version->get_mutable_structure()->append_shard(flush_recon.new_shard, args->version->get_id(), flush_recon.target_level);
+
/* advance the buffer head for a flush */
bool success = false;
size_t failure_cnt = 0;
@@ -505,35 +515,28 @@ private:
if (!success) {
failure_cnt++;
usleep(1);
- // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n",
- // args->version->get_id(), recon_id);
+ // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n", args->version->get_id(), recon_id);
if (failure_cnt >=
extension->m_config.buffer_flush_query_preemption_trigger) {
extension->preempt_queries();
if (failure_cnt > 500000) {
- // fprintf(stderr,
- // "[C] Critical failure. Hung on version: %ld (%ld)\n",
- // extension->m_buffer->debug_get_old_head(), recon_id);
+ // fprintf(stderr, "[C] Critical failure. Hung on version: %ld (%ld)\n", extension->m_buffer->debug_get_old_head(), recon_id);
}
}
}
}
-
- // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n",
- // new_head, recon_id);
+ // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n", new_head, recon_id);
} else {
- // fprintf(stderr, "[I] Running background reconstruction (%ld)\n",
- // recon_id);
+ // fprintf(stderr, "[I] Running background reconstruction (%ld)\n", recon_id);
}
/* perform all of the reconstructions */
auto structure = args->version->get_structure();
assert(structure);
- // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n",
- // structure->get_level_vector()[0]->get_shard_count(), recon_id);
+ // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n", structure->get_level_vector()[0]->get_shard_count(), recon_id);
for (size_t i = 0; i < args->tasks.size(); i++) {
reconstructions.emplace_back(
@@ -584,24 +587,20 @@ private:
size_t new_reccnt = args->version->get_structure()->get_record_count();
- // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n",
- // args->version->get_structure()->get_level_vector()[0]->get_shard_count(),
- // recon_id);
+ // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n", args->version->get_structure()->get_level_vector()[0]->get_shard_count(), recon_id);
/* for maintenance reconstructions, advance the buffer head to match the
* currently active version */
if (args->priority == ReconstructionPriority::MAINT) {
args->version->set_buffer(extension->m_buffer.get(),
active_version->get_head());
- // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n",
- // active_version->get_head(), recon_id);
+ // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n", active_version->get_head(), recon_id);
if (new_reccnt != cur_reccnt) {
fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id);
}
}
- // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt,
- // cur_reccnt, new_reccnt, recon_id);
+ // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt, cur_reccnt, new_reccnt, recon_id);
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
@@ -626,8 +625,7 @@ private:
extension->m_lock_mngr.release_buffer_lock();
}
- // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n",
- // args->version->get_id(), recon_id);
+ // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n", args->version->get_id(), recon_id);
/* manually delete the argument object */
delete args;
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index ae17182..42ae506 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -21,11 +21,13 @@ class BSMPolicy : public ReconstructionPolicy<ShardType, QueryType> {
LevelVector;
public:
- BSMPolicy(size_t buffer_size, size_t scale_factor, size_t modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(modifier) {}
+ BSMPolicy(size_t buffer_size, size_t scale_factor, size_t modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(modifier) {}
std::vector<ReconstructionVector>
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
@@ -45,23 +47,31 @@ public:
ReconstructionTask task;
task.target = target_level;
- task.type = ReconstructionType::Merge;
- std::vector<ShardID> source_shards;
- size_t reccnt = 0;
+ if (target_level == 1 &&
+ (levels.size() == 1 || levels[1]->get_record_count() == 0)) {
+ /* if the first level is empty, then we just append the buffer to it */
+ task.type = ReconstructionType::Append;
+ } else {
+ /* otherwise, we'll need to do a merge of at least two shards */
+ task.type = ReconstructionType::Merge;
+ }
- source_shards.push_back({0, all_shards_idx});
+ size_t reccnt = 0;
+ if (target_level < (ssize_t)levels.size() && levels[target_level]->get_record_count() > 0) {
+ task.sources.push_back({target_level, all_shards_idx});
+ }
- for (level_index i = target_level; i > source_level; i--) {
- if (i < (level_index)levels.size()) {
- source_shards.push_back({i-1, all_shards_idx});
- reccnt += levels[i-1]->get_record_count();
- }
+ for (level_index i = target_level - 1; i >= source_level; i--) {
+ assert(i < (ssize_t)levels.size());
+ task.sources.push_back({i, all_shards_idx});
+ reccnt += levels[i]->get_record_count();
}
- assert(source_shards.size() > 0);
+ task.reccnt = reccnt;
+ assert(task.sources.size() > 0);
+ reconstructions.add_reconstruction(task);
- reconstructions.add_reconstruction(source_shards, target_level, reccnt, ReconstructionType::Merge);
return reconstructions;
}
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index d448fee..6e20cdb 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -21,11 +21,13 @@ class LevelingPolicy : public ReconstructionPolicy<ShardType, QueryType> {
LevelVector;
public:
- LevelingPolicy(size_t scale_factor, size_t buffer_size, size_t modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(modifier) {}
+ LevelingPolicy(size_t scale_factor, size_t buffer_size, size_t modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(modifier) {}
std::vector<ReconstructionVector>
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
@@ -34,15 +36,8 @@ public:
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
- /*
- * on the very first flush, the whole structure will be empty, so
- * there isn't anything to merge into level 1 from level 0 yet.
- */
- if (version->get_structure()->get_record_count() == 0) {
- return reconstructions;
- }
-
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -51,29 +46,43 @@ public:
target_level = levels.size();
}
- for (level_index i = target_level; i > source_level; i--) {
- size_t target_reccnt =
- (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
- size_t total_reccnt =
- (i == 0) ? m_buffer_size + target_reccnt
- : levels[i - 1]->get_record_count() + target_reccnt;
+ /*
+ * For leveling, the only "actual" reconstruction happens at the target
+ * level. All the other reconstructions simply shift the levels down
+ * without needing to do any rebuilding.
+ */
+ size_t target_reccnt = (target_level < (level_index)levels.size())
+ ? levels[target_level]->get_record_count()
+ : 0;
+ size_t total_reccnt =
+ (target_level == 1)
+ ? m_buffer_size + target_reccnt
+ : levels[target_level - 1]->get_record_count() + target_reccnt;
+ auto type = (target_level >= (level_index) levels.size()) ? ReconstructionType::Append
+ : ReconstructionType::Merge;
+ reconstructions.add_reconstruction(target_level - 1, target_level,
+ total_reccnt, type);
- if (total_reccnt > 0) {
- auto type = (i >= (level_index) levels.size()) ? ReconstructionType::Append : ReconstructionType::Merge;
- reconstructions.add_reconstruction(i - 1, i, total_reccnt, type);
- }
+ /*
+ * For all other levels, we'll shift them down by using a single-source
+ * append.
+ */
+ for (level_index i = target_level - 1; i > source_level; i--) {
+ reconstructions.add_reconstruction(i - 1, i, 0,
+ ReconstructionType::Append);
}
-
return reconstructions;
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
size_t incoming_records = m_buffer_size;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
- if (levels[i]->get_record_count() + incoming_records < capacity(i, reccnt)) {
+ if (levels[i]->get_record_count() + incoming_records <
+ capacity(i, reccnt)) {
target_level = i;
break;
}
@@ -85,7 +94,8 @@ private:
}
inline size_t capacity(level_index level, size_t reccnt) const {
- return m_buffer_size * pow(m_scale_factor * pow(std::log(reccnt), m_size_modifier), level);
+ return m_buffer_size *
+ pow(m_scale_factor * pow(std::log(reccnt), m_size_modifier), level);
}
size_t m_scale_factor;
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 16fe111..6875aca 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -61,6 +61,7 @@ public:
m_stats.job_queued(ts, type, size);
m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
+
m_cv.notify_all();
}
@@ -107,8 +108,8 @@ private:
}
void run() {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
do {
- std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index a77088f..0fd737c 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -170,8 +170,23 @@ public:
inline reconstruction_results<ShardType> perform_reconstruction(ReconstructionTask task) const {
reconstruction_results<ShardType> result;
result.target_level = task.target;
-
- std::vector<const ShardType *> shards;
+
+ /* if there is only one source, then we don't need to actually rebuild */
+ if (task.sources.size() == 1) {
+ auto shid = task.sources[0];
+ if (shid.shard_idx == all_shards_idx && m_levels[shid.level_idx]->get_shard_count() > 1) {
+ /* there's more than one shard, so we need to do the reconstruction */
+ } else {
+ auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
+ assert(raw_shard_ptr);
+ result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
+ result.new_shard = m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first;
+
+ return result;
+ }
+ }
+
+ std::vector<const ShardType*> shards;
for (ShardID shid : task.sources) {
assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
@@ -233,7 +248,7 @@ public:
m_levels[shards[i].first]->delete_shard(shard_idx);
} else {
fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", shards[i].first, shards[i].second);
- //exit(EXIT_FAILURE);
+ exit(EXIT_FAILURE);
}
}
}
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 5659c72..54b3ae2 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -217,17 +217,11 @@ public:
m_shards.push_back(shard);
}
- const ShardType *get_shard(ShardID shid) const {
- if (shid < m_shards.size()) {
- return m_shards[shid].first.get();
- }
-
- return nullptr;
- }
-
- const shard_ptr get_shard_ptr(size_t shid) const {
- if (shid < m_shards.size()) {
- return m_shards[shid];
+ const shard_ptr get_shard_ptr(ssize_t idx) const {
+ if (idx >= 0 && idx < (ssize_t) m_shards.size()) {
+ return m_shards[idx];
+ } else if (idx == all_shards_idx && m_shards.size() == 1) {
+ return m_shards[0];
}
return {nullptr, 0};