summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
commit138c793b0a58577713d98c98bb140cf1d9c79bee (patch)
tree921197e2ba521704cb379ac8069189e70f8dee3d /include/framework
parent2117935e85412f3733ee0bcb1830c7fd0b129b29 (diff)
downloaddynamic-extension-138c793b0a58577713d98c98bb140cf1d9c79bee.tar.gz
Multiple concurrency bug fixes
A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test.
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h91
-rw-r--r--include/framework/interface/Shard.h5
-rw-r--r--include/framework/scheduling/Epoch.h51
-rw-r--r--include/framework/structure/BufferView.h2
-rw-r--r--include/framework/structure/InternalLevel.h26
-rw-r--r--include/framework/structure/MutableBuffer.h58
6 files changed, 107 insertions, 126 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index cb21ae3..7590de2 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -49,7 +49,7 @@ public:
, m_buffer(new Buffer(buffer_lwm, buffer_hwm))
{
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
- auto epoch = new _Epoch(0, vers, m_buffer);
+ auto epoch = new _Epoch(0, vers, m_buffer, 0);
m_versions.insert(vers);
m_epochs.insert({0, epoch});
@@ -169,6 +169,15 @@ public:
auto vers = epoch->get_structure();
std::vector<Shard *> shards;
+
+ if (vers->get_levels().size() > 0) {
+ for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
+ if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
+ shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
+ }
+ }
+ }
+
/*
* construct a shard from the buffer view. We'll hold the view
* for as short a time as possible: once the records are exfiltrated
@@ -182,24 +191,7 @@ public:
}
}
- if (vers->get_levels().size() > 0) {
- for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
- if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
- shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
- }
- }
- }
-
- Shard *shards_array[shards.size()];
-
- size_t j = 0;
- for (size_t i=0; i<shards.size(); i++) {
- if (shards[i]) {
- shards_array[j++] = shards[i];
- }
- }
-
- Shard *flattened = new S(shards_array, j);
+ Shard *flattened = new S(shards);
for (auto shard : shards) {
delete shard;
@@ -302,7 +294,7 @@ private:
return m_epochs[cur_epoch];
}
- void advance_epoch() {
+ void advance_epoch(size_t buffer_head) {
m_epoch_transition_lk.lock();
@@ -322,26 +314,14 @@ private:
*/
enforce_delete_invariant(new_epoch);
- #if 0
- /*
- * Update the new Epoch to contain the buffers from the old one
- * that it doesn't currently have if using a multi-threaded
- * scheduler (otherwise, there is only one buffer that is
- * reused, so it shouldn't be removed)
- */
- if constexpr (!std::same_as<SCHED, SerialScheduler>) {
- size_t old_buffer_cnt = new_epoch->clear_buffers();
-
- /*
- * skip the first buffer, as this was flushed into the epoch's
- * structure already, and copy all the other buffer references
- * into the new epoch
- */
- for (size_t i=1; i<old_epoch->get_buffers().size(); i++) {
- new_epoch->add_buffer(old_epoch->get_buffers()[i]);
- }
- }
- #endif
+ // FIXME: this may currently fail because there isn't any
+ // query preemption yet. At this point, we'd need to either
+ // 1) wait for all queries on the old_head to finish
+ // 2) kill all queries on the old_head
+ // 3) somehow migrate all queries on the old_head to the new
+ // version
+ auto res = new_epoch->advance_buffer_head(buffer_head);
+ assert(res);
m_current_epoch.fetch_add(1);
old_epoch->set_inactive();
@@ -425,40 +405,29 @@ private:
}
static void reconstruction(void *arguments) {
- ReconstructionArgs<R, S, Q, L> *args = (ReconstructionArgs<R, S, Q, L> *) arguments;
-
+ auto args = (ReconstructionArgs<R, S, Q, L> *) arguments;
Structure *vers = args->epoch->get_structure();
-
for (ssize_t i=0; i<args->merges.size(); i++) {
vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
- /*
- * we'll grab the buffer AFTER doing the internal reconstruction, so we can
- * flush as many records as possible in one go. The reconstruction was done so
- * as to make room for the full buffer anyway, so there's no real benefit to doing
- * this first.
+ /*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we
+ * can flush as many records as possible in one go. The reconstruction
+ * was done so as to make room for the full buffer anyway, so there's
+ * no real benefit to doing this first.
*/
auto buffer_view = args->epoch->get_buffer();
size_t new_head = buffer_view.get_tail();
- /*
- * if performing a compaction, don't flush the buffer, as
- * there is no guarantee that any necessary reconstructions
+ /*
+ * if performing a compaction, don't flush the buffer, as
+ * there is no guarantee that any necessary reconstructions
* will free sufficient space in L0 to support a flush
*/
if (!args->compaction) {
vers->flush_buffer(std::move(buffer_view));
-
- // FIXME: this may currently fail because there isn't any
- // query preemption yet. At this point, we'd need to either
- // 1) wait for all queries on the old_head to finish
- // 2) kill all queries on the old_head
- // 3) somehow migrate all queries on the old_head to the new
- // version
- auto res = args->epoch->advance_buffer_head(new_head);
- assert(res);
}
args->epoch->end_job();
@@ -470,7 +439,7 @@ private:
* part of a compaction
*/
if (!args->compaction) {
- ((DynamicExtension *) args->extension)->advance_epoch();
+ ((DynamicExtension *) args->extension)->advance_epoch(new_head);
}
((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false;
diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h
index 2357795..8c4db34 100644
--- a/include/framework/interface/Shard.h
+++ b/include/framework/interface/Shard.h
@@ -12,6 +12,7 @@
#include "util/types.h"
#include "framework/interface/Record.h"
+#include <vector>
namespace de {
@@ -19,8 +20,8 @@ namespace de {
// determining a good way to handle additional template arguments
// to get the Record type into play
template <typename S>
-concept ShardInterface = requires(S s, S **spp, void *p, bool b, size_t i) {
- {S(spp, i)};
+concept ShardInterface = requires(S s, std::vector<S*> spp, void *p, bool b, size_t i) {
+ {S(spp)};
/*
{S(mutable buffer)}
{s.point_lookup(r, b) } -> std::convertible_to<void*>
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index b005ff6..45ee17d 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -32,15 +32,17 @@ public:
, m_active_jobs(0)
, m_active(true)
, m_epoch_number(number)
+ , m_buffer_head(0)
{}
- Epoch(size_t number, Structure *structure, Buffer *buff)
+ Epoch(size_t number, Structure *structure, Buffer *buff, size_t head)
: m_buffer(buff)
, m_structure(structure)
, m_active_jobs(0)
, m_active_merge(false)
, m_active(true)
, m_epoch_number(number)
+ , m_buffer_head(head)
{
structure->take_reference();
}
@@ -48,22 +50,21 @@ public:
~Epoch() {
assert(m_active_jobs.load() == 0);
- /* FIXME: this is needed to keep the destructor from
- * sometimes locking up here. But there *shouldn't* be
- * any threads waiting on this signal at object destruction,
- * so something else is going on here that needs looked into
+ /* FIXME: this is needed to keep the destructor from sometimes locking
+ * up here. But there *shouldn't* be any threads waiting on this signal
+ * at object destruction, so something else is going on here that needs
+ * looked into
*/
- //m_active_cv.notify_all();
+ // m_active_cv.notify_all();
if (m_structure) {
m_structure->release_reference();
}
}
-
- /*
- * Epochs are *not* copyable or movable. Only one can exist, and all users of
- * it work with pointers
+ /*
+ * Epochs are *not* copyable or movable. Only one can exist, and all users
+ * of it work with pointers
*/
Epoch(const Epoch&) = delete;
Epoch(Epoch&&) = delete;
@@ -97,23 +98,20 @@ public:
}
BufView get_buffer() {
- return m_buffer->get_buffer_view();
- }
-
- BufView get_flush_buffer() {
- return m_buffer->get_flush_buffer_view();
+ return m_buffer->get_buffer_view(m_buffer_head);
}
-
/*
- * Returns a new Epoch object that is a copy of this one. The new object will also contain
- * a copy of the m_structure, rather than a reference to the same one. The epoch number of
- * the new epoch will be set to the provided argument.
+ * Returns a new Epoch object that is a copy of this one. The new object
+ * will also contain a copy of the m_structure, rather than a reference to
+ * the same one. The epoch number of the new epoch will be set to the
+ * provided argument.
*/
Epoch *clone(size_t number) {
std::unique_lock<std::mutex> m_buffer_lock;
auto epoch = new Epoch(number);
epoch->m_buffer = m_buffer;
+ epoch->m_buffer_head = m_buffer_head;
if (m_structure) {
epoch->m_structure = m_structure->copy();
@@ -125,12 +123,10 @@ public:
}
/*
- * Check if a merge can be started from this Epoch.
- * At present, without concurrent merging, this simply
- * checks if there is currently a scheduled merge based
- * on this Epoch. If there is, returns false. If there
- * isn't, return true and set a flag indicating that
- * there is an active merge.
+ * Check if a merge can be started from this Epoch. At present, without
+ * concurrent merging, this simply checks if there is currently a scheduled
+ * merge based on this Epoch. If there is, returns false. If there isn't,
+ * return true and set a flag indicating that there is an active merge.
*/
bool prepare_reconstruction() {
auto old = m_active_merge.load();
@@ -176,7 +172,8 @@ public:
}
bool advance_buffer_head(size_t head) {
- return m_buffer->advance_head(head);
+ m_buffer_head = head;
+ return m_buffer->advance_head(m_buffer_head);
}
private:
@@ -187,7 +184,6 @@ private:
std::mutex m_cv_lock;
std::mutex m_buffer_lock;
-
std::atomic<bool> m_active_merge;
/*
@@ -199,5 +195,6 @@ private:
std::atomic<size_t> m_active_jobs;
bool m_active;
size_t m_epoch_number;
+ size_t m_buffer_head;
};
}
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 099b7a2..30fffed 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -110,7 +110,7 @@ public:
size_t split_idx = m_cap - (m_head % m_cap);
memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped<R>));
- memcpy(buffer + split_idx, (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
+ memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
} else {
memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
}
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index b35cadd..e9874e0 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -51,11 +51,10 @@ public:
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
- Shard* shards[2];
- shards[0] = base_level->m_shards[0].get();
- shards[1] = new_level->m_shards[0].get();
+ std::vector<Shard *> shards = {base_level->m_shards[0].get(),
+ new_level->m_shards[0].get()};
- res->m_shards[0] = std::make_shared<S>(shards, 2);
+ res->m_shards[0] = std::make_shared<S>(shards);
return std::shared_ptr<InternalLevel>(res);
}
@@ -75,17 +74,17 @@ public:
return;
}
- Shard *shards[level->m_shard_cnt];
- for (size_t i=0; i<level->m_shard_cnt; i++) {
- shards[i] = level->m_shards[i].get();
+ std::vector<S*> shards;
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
}
if (m_shard_cnt == m_shards.size()) {
- m_pending_shard = new S(shards, level->m_shard_cnt);
+ m_pending_shard = new S(shards);
return;
}
- auto tmp = new S(shards, level->m_shard_cnt);
+ auto tmp = new S(shards);
m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp);
++m_shard_cnt;
@@ -131,13 +130,12 @@ public:
return nullptr;
}
- Shard *shards[m_shard_cnt];
-
- for (size_t i=0; i<m_shard_cnt; i++) {
- shards[i] = m_shards[i].get();
+ std::vector<Shard *> shards;
+ for (auto shard : m_shards) {
+ if (shard) shards.emplace_back(shard.get());
}
- return new S(shards, m_shard_cnt);
+ return new S(shards);
}
void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index eeb3dc9..7edde2f 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -133,18 +133,18 @@ public:
return m_tombstone_filter->get_memory_usage();
}
- BufferView<R> get_buffer_view() {
- size_t head = get_head();
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
auto f = std::bind(release_head_reference, (void *) this, head);
return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
- BufferView<R> get_flush_buffer_view() {
- size_t head = get_head();
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
auto f = std::bind(release_head_reference, (void *) this, head);
- return BufferView<R>(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f);
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
/*
@@ -167,23 +167,39 @@ public:
buffer_head new_hd = {new_head, 0};
buffer_head cur_hd;
- /* move the current head into the old head */
+ /* replace current head with new head */
do {
- buffer_head cur_hd = m_head.load();
- m_old_head.store(cur_hd);
+ cur_hd = m_head.load();
} while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
m_active_head_advance.store(false);
return true;
}
- size_t get_head() {
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
do {
- cur_hd = m_head.load();
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head){
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while(!head_acquired);
return new_hd.head_idx;
}
@@ -254,22 +270,22 @@ private:
buffer_head cur_hd, new_hd;
do {
- if (buffer->m_head.load().head_idx == head) {
- cur_hd = buffer->m_head;
- assert(cur_hd.refcnt > 0);
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0) continue;
new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
break;
}
} else {
- cur_hd = buffer->m_old_head;
- assert(cur_hd.refcnt > 0);
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0) continue;
new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
break;
}
- }
+ }
_mm_pause();
} while(true);
}