summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
commit138c793b0a58577713d98c98bb140cf1d9c79bee (patch)
tree921197e2ba521704cb379ac8069189e70f8dee3d /include/framework/DynamicExtension.h
parent2117935e85412f3733ee0bcb1830c7fd0b129b29 (diff)
downloaddynamic-extension-138c793b0a58577713d98c98bb140cf1d9c79bee.tar.gz
Multiple concurrency bug fixes
A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test.
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h91
1 files changed, 30 insertions, 61 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index cb21ae3..7590de2 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -49,7 +49,7 @@ public:
, m_buffer(new Buffer(buffer_lwm, buffer_hwm))
{
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
- auto epoch = new _Epoch(0, vers, m_buffer);
+ auto epoch = new _Epoch(0, vers, m_buffer, 0);
m_versions.insert(vers);
m_epochs.insert({0, epoch});
@@ -169,6 +169,15 @@ public:
auto vers = epoch->get_structure();
std::vector<Shard *> shards;
+
+ if (vers->get_levels().size() > 0) {
+ for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
+ if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
+ shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
+ }
+ }
+ }
+
/*
* construct a shard from the buffer view. We'll hold the view
* for as short a time as possible: once the records are exfiltrated
@@ -182,24 +191,7 @@ public:
}
}
- if (vers->get_levels().size() > 0) {
- for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
- if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
- shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
- }
- }
- }
-
- Shard *shards_array[shards.size()];
-
- size_t j = 0;
- for (size_t i=0; i<shards.size(); i++) {
- if (shards[i]) {
- shards_array[j++] = shards[i];
- }
- }
-
- Shard *flattened = new S(shards_array, j);
+ Shard *flattened = new S(shards);
for (auto shard : shards) {
delete shard;
@@ -302,7 +294,7 @@ private:
return m_epochs[cur_epoch];
}
- void advance_epoch() {
+ void advance_epoch(size_t buffer_head) {
m_epoch_transition_lk.lock();
@@ -322,26 +314,14 @@ private:
*/
enforce_delete_invariant(new_epoch);
- #if 0
- /*
- * Update the new Epoch to contain the buffers from the old one
- * that it doesn't currently have if using a multi-threaded
- * scheduler (otherwise, there is only one buffer that is
- * reused, so it shouldn't be removed)
- */
- if constexpr (!std::same_as<SCHED, SerialScheduler>) {
- size_t old_buffer_cnt = new_epoch->clear_buffers();
-
- /*
- * skip the first buffer, as this was flushed into the epoch's
- * structure already, and copy all the other buffer references
- * into the new epoch
- */
- for (size_t i=1; i<old_epoch->get_buffers().size(); i++) {
- new_epoch->add_buffer(old_epoch->get_buffers()[i]);
- }
- }
- #endif
+ // FIXME: this may currently fail because there isn't any
+ // query preemption yet. At this point, we'd need to either
+ // 1) wait for all queries on the old_head to finish
+ // 2) kill all queries on the old_head
+ // 3) somehow migrate all queries on the old_head to the new
+ // version
+ auto res = new_epoch->advance_buffer_head(buffer_head);
+ assert(res);
m_current_epoch.fetch_add(1);
old_epoch->set_inactive();
@@ -425,40 +405,29 @@ private:
}
static void reconstruction(void *arguments) {
- ReconstructionArgs<R, S, Q, L> *args = (ReconstructionArgs<R, S, Q, L> *) arguments;
-
+ auto args = (ReconstructionArgs<R, S, Q, L> *) arguments;
Structure *vers = args->epoch->get_structure();
-
for (ssize_t i=0; i<args->merges.size(); i++) {
vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
- /*
- * we'll grab the buffer AFTER doing the internal reconstruction, so we can
- * flush as many records as possible in one go. The reconstruction was done so
- * as to make room for the full buffer anyway, so there's no real benefit to doing
- * this first.
+ /*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we
+ * can flush as many records as possible in one go. The reconstruction
+ * was done so as to make room for the full buffer anyway, so there's
+ * no real benefit to doing this first.
*/
auto buffer_view = args->epoch->get_buffer();
size_t new_head = buffer_view.get_tail();
- /*
- * if performing a compaction, don't flush the buffer, as
- * there is no guarantee that any necessary reconstructions
+ /*
+ * if performing a compaction, don't flush the buffer, as
+ * there is no guarantee that any necessary reconstructions
* will free sufficient space in L0 to support a flush
*/
if (!args->compaction) {
vers->flush_buffer(std::move(buffer_view));
-
- // FIXME: this may currently fail because there isn't any
- // query preemption yet. At this point, we'd need to either
- // 1) wait for all queries on the old_head to finish
- // 2) kill all queries on the old_head
- // 3) somehow migrate all queries on the old_head to the new
- // version
- auto res = args->epoch->advance_buffer_head(new_head);
- assert(res);
}
args->epoch->end_job();
@@ -470,7 +439,7 @@ private:
* part of a compaction
*/
if (!args->compaction) {
- ((DynamicExtension *) args->extension)->advance_epoch();
+ ((DynamicExtension *) args->extension)->advance_epoch(new_head);
}
((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false;