summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
commit138c793b0a58577713d98c98bb140cf1d9c79bee (patch)
tree921197e2ba521704cb379ac8069189e70f8dee3d /include
parent2117935e85412f3733ee0bcb1830c7fd0b129b29 (diff)
downloaddynamic-extension-138c793b0a58577713d98c98bb140cf1d9c79bee.tar.gz
Multiple concurrency bug fixes
A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test.
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h91
-rw-r--r--include/framework/interface/Shard.h5
-rw-r--r--include/framework/scheduling/Epoch.h51
-rw-r--r--include/framework/structure/BufferView.h2
-rw-r--r--include/framework/structure/InternalLevel.h26
-rw-r--r--include/framework/structure/MutableBuffer.h58
-rw-r--r--include/query/rangequery.h5
-rw-r--r--include/shard/ISAMTree.h35
8 files changed, 130 insertions, 143 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index cb21ae3..7590de2 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -49,7 +49,7 @@ public:
, m_buffer(new Buffer(buffer_lwm, buffer_hwm))
{
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
- auto epoch = new _Epoch(0, vers, m_buffer);
+ auto epoch = new _Epoch(0, vers, m_buffer, 0);
m_versions.insert(vers);
m_epochs.insert({0, epoch});
@@ -169,6 +169,15 @@ public:
auto vers = epoch->get_structure();
std::vector<Shard *> shards;
+
+ if (vers->get_levels().size() > 0) {
+ for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
+ if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
+ shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
+ }
+ }
+ }
+
/*
* construct a shard from the buffer view. We'll hold the view
* for as short a time as possible: once the records are exfiltrated
@@ -182,24 +191,7 @@ public:
}
}
- if (vers->get_levels().size() > 0) {
- for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
- if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
- shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
- }
- }
- }
-
- Shard *shards_array[shards.size()];
-
- size_t j = 0;
- for (size_t i=0; i<shards.size(); i++) {
- if (shards[i]) {
- shards_array[j++] = shards[i];
- }
- }
-
- Shard *flattened = new S(shards_array, j);
+ Shard *flattened = new S(shards);
for (auto shard : shards) {
delete shard;
@@ -302,7 +294,7 @@ private:
return m_epochs[cur_epoch];
}
- void advance_epoch() {
+ void advance_epoch(size_t buffer_head) {
m_epoch_transition_lk.lock();
@@ -322,26 +314,14 @@ private:
*/
enforce_delete_invariant(new_epoch);
- #if 0
- /*
- * Update the new Epoch to contain the buffers from the old one
- * that it doesn't currently have if using a multi-threaded
- * scheduler (otherwise, there is only one buffer that is
- * reused, so it shouldn't be removed)
- */
- if constexpr (!std::same_as<SCHED, SerialScheduler>) {
- size_t old_buffer_cnt = new_epoch->clear_buffers();
-
- /*
- * skip the first buffer, as this was flushed into the epoch's
- * structure already, and copy all the other buffer references
- * into the new epoch
- */
- for (size_t i=1; i<old_epoch->get_buffers().size(); i++) {
- new_epoch->add_buffer(old_epoch->get_buffers()[i]);
- }
- }
- #endif
+ // FIXME: this may currently fail because there isn't any
+ // query preemption yet. At this point, we'd need to either
+ // 1) wait for all queries on the old_head to finish
+ // 2) kill all queries on the old_head
+ // 3) somehow migrate all queries on the old_head to the new
+ // version
+ auto res = new_epoch->advance_buffer_head(buffer_head);
+ assert(res);
m_current_epoch.fetch_add(1);
old_epoch->set_inactive();
@@ -425,40 +405,29 @@ private:
}
static void reconstruction(void *arguments) {
- ReconstructionArgs<R, S, Q, L> *args = (ReconstructionArgs<R, S, Q, L> *) arguments;
-
+ auto args = (ReconstructionArgs<R, S, Q, L> *) arguments;
Structure *vers = args->epoch->get_structure();
-
for (ssize_t i=0; i<args->merges.size(); i++) {
vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
- /*
- * we'll grab the buffer AFTER doing the internal reconstruction, so we can
- * flush as many records as possible in one go. The reconstruction was done so
- * as to make room for the full buffer anyway, so there's no real benefit to doing
- * this first.
+ /*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we
+ * can flush as many records as possible in one go. The reconstruction
+ * was done so as to make room for the full buffer anyway, so there's
+ * no real benefit to doing this first.
*/
auto buffer_view = args->epoch->get_buffer();
size_t new_head = buffer_view.get_tail();
- /*
- * if performing a compaction, don't flush the buffer, as
- * there is no guarantee that any necessary reconstructions
+ /*
+ * if performing a compaction, don't flush the buffer, as
+ * there is no guarantee that any necessary reconstructions
* will free sufficient space in L0 to support a flush
*/
if (!args->compaction) {
vers->flush_buffer(std::move(buffer_view));
-
- // FIXME: this may currently fail because there isn't any
- // query preemption yet. At this point, we'd need to either
- // 1) wait for all queries on the old_head to finish
- // 2) kill all queries on the old_head
- // 3) somehow migrate all queries on the old_head to the new
- // version
- auto res = args->epoch->advance_buffer_head(new_head);
- assert(res);
}
args->epoch->end_job();
@@ -470,7 +439,7 @@ private:
* part of a compaction
*/
if (!args->compaction) {
- ((DynamicExtension *) args->extension)->advance_epoch();
+ ((DynamicExtension *) args->extension)->advance_epoch(new_head);
}
((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false;
diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h
index 2357795..8c4db34 100644
--- a/include/framework/interface/Shard.h
+++ b/include/framework/interface/Shard.h
@@ -12,6 +12,7 @@
#include "util/types.h"
#include "framework/interface/Record.h"
+#include <vector>
namespace de {
@@ -19,8 +20,8 @@ namespace de {
// determining a good way to handle additional template arguments
// to get the Record type into play
template <typename S>
-concept ShardInterface = requires(S s, S **spp, void *p, bool b, size_t i) {
- {S(spp, i)};
+concept ShardInterface = requires(S s, std::vector<S*> spp, void *p, bool b, size_t i) {
+ {S(spp)};
/*
{S(mutable buffer)}
{s.point_lookup(r, b) } -> std::convertible_to<void*>
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index b005ff6..45ee17d 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -32,15 +32,17 @@ public:
, m_active_jobs(0)
, m_active(true)
, m_epoch_number(number)
+ , m_buffer_head(0)
{}
- Epoch(size_t number, Structure *structure, Buffer *buff)
+ Epoch(size_t number, Structure *structure, Buffer *buff, size_t head)
: m_buffer(buff)
, m_structure(structure)
, m_active_jobs(0)
, m_active_merge(false)
, m_active(true)
, m_epoch_number(number)
+ , m_buffer_head(head)
{
structure->take_reference();
}
@@ -48,22 +50,21 @@ public:
~Epoch() {
assert(m_active_jobs.load() == 0);
- /* FIXME: this is needed to keep the destructor from
- * sometimes locking up here. But there *shouldn't* be
- * any threads waiting on this signal at object destruction,
- * so something else is going on here that needs looked into
+ /* FIXME: this is needed to keep the destructor from sometimes locking
+ * up here. But there *shouldn't* be any threads waiting on this signal
+ * at object destruction, so something else is going on here that needs
+ * looked into
*/
- //m_active_cv.notify_all();
+ // m_active_cv.notify_all();
if (m_structure) {
m_structure->release_reference();
}
}
-
- /*
- * Epochs are *not* copyable or movable. Only one can exist, and all users of
- * it work with pointers
+ /*
+ * Epochs are *not* copyable or movable. Only one can exist, and all users
+ * of it work with pointers
*/
Epoch(const Epoch&) = delete;
Epoch(Epoch&&) = delete;
@@ -97,23 +98,20 @@ public:
}
BufView get_buffer() {
- return m_buffer->get_buffer_view();
- }
-
- BufView get_flush_buffer() {
- return m_buffer->get_flush_buffer_view();
+ return m_buffer->get_buffer_view(m_buffer_head);
}
-
/*
- * Returns a new Epoch object that is a copy of this one. The new object will also contain
- * a copy of the m_structure, rather than a reference to the same one. The epoch number of
- * the new epoch will be set to the provided argument.
+ * Returns a new Epoch object that is a copy of this one. The new object
+ * will also contain a copy of the m_structure, rather than a reference to
+ * the same one. The epoch number of the new epoch will be set to the
+ * provided argument.
*/
Epoch *clone(size_t number) {
std::unique_lock<std::mutex> m_buffer_lock;
auto epoch = new Epoch(number);
epoch->m_buffer = m_buffer;
+ epoch->m_buffer_head = m_buffer_head;
if (m_structure) {
epoch->m_structure = m_structure->copy();
@@ -125,12 +123,10 @@ public:
}
/*
- * Check if a merge can be started from this Epoch.
- * At present, without concurrent merging, this simply
- * checks if there is currently a scheduled merge based
- * on this Epoch. If there is, returns false. If there
- * isn't, return true and set a flag indicating that
- * there is an active merge.
+ * Check if a merge can be started from this Epoch. At present, without
+ * concurrent merging, this simply checks if there is currently a scheduled
+ * merge based on this Epoch. If there is, returns false. If there isn't,
+ * return true and set a flag indicating that there is an active merge.
*/
bool prepare_reconstruction() {
auto old = m_active_merge.load();
@@ -176,7 +172,8 @@ public:
}
bool advance_buffer_head(size_t head) {
- return m_buffer->advance_head(head);
+ m_buffer_head = head;
+ return m_buffer->advance_head(m_buffer_head);
}
private:
@@ -187,7 +184,6 @@ private:
std::mutex m_cv_lock;
std::mutex m_buffer_lock;
-
std::atomic<bool> m_active_merge;
/*
@@ -199,5 +195,6 @@ private:
std::atomic<size_t> m_active_jobs;
bool m_active;
size_t m_epoch_number;
+ size_t m_buffer_head;
};
}
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 099b7a2..30fffed 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -110,7 +110,7 @@ public:
size_t split_idx = m_cap - (m_head % m_cap);
memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped<R>));
- memcpy(buffer + split_idx, (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
+ memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped<R>));
} else {
memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped<R>));
}
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index b35cadd..e9874e0 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -51,11 +51,10 @@ public:
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
- Shard* shards[2];
- shards[0] = base_level->m_shards[0].get();
- shards[1] = new_level->m_shards[0].get();
+ std::vector<Shard *> shards = {base_level->m_shards[0].get(),
+ new_level->m_shards[0].get()};
- res->m_shards[0] = std::make_shared<S>(shards, 2);
+ res->m_shards[0] = std::make_shared<S>(shards);
return std::shared_ptr<InternalLevel>(res);
}
@@ -75,17 +74,17 @@ public:
return;
}
- Shard *shards[level->m_shard_cnt];
- for (size_t i=0; i<level->m_shard_cnt; i++) {
- shards[i] = level->m_shards[i].get();
+ std::vector<S*> shards;
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
}
if (m_shard_cnt == m_shards.size()) {
- m_pending_shard = new S(shards, level->m_shard_cnt);
+ m_pending_shard = new S(shards);
return;
}
- auto tmp = new S(shards, level->m_shard_cnt);
+ auto tmp = new S(shards);
m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp);
++m_shard_cnt;
@@ -131,13 +130,12 @@ public:
return nullptr;
}
- Shard *shards[m_shard_cnt];
-
- for (size_t i=0; i<m_shard_cnt; i++) {
- shards[i] = m_shards[i].get();
+ std::vector<Shard *> shards;
+ for (auto shard : m_shards) {
+ if (shard) shards.emplace_back(shard.get());
}
- return new S(shards, m_shard_cnt);
+ return new S(shards);
}
void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index eeb3dc9..7edde2f 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -133,18 +133,18 @@ public:
return m_tombstone_filter->get_memory_usage();
}
- BufferView<R> get_buffer_view() {
- size_t head = get_head();
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
auto f = std::bind(release_head_reference, (void *) this, head);
return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
- BufferView<R> get_flush_buffer_view() {
- size_t head = get_head();
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
auto f = std::bind(release_head_reference, (void *) this, head);
- return BufferView<R>(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f);
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
/*
@@ -167,23 +167,39 @@ public:
buffer_head new_hd = {new_head, 0};
buffer_head cur_hd;
- /* move the current head into the old head */
+ /* replace current head with new head */
do {
- buffer_head cur_hd = m_head.load();
- m_old_head.store(cur_hd);
+ cur_hd = m_head.load();
} while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
m_active_head_advance.store(false);
return true;
}
- size_t get_head() {
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
do {
- cur_hd = m_head.load();
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head){
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while(!head_acquired);
return new_hd.head_idx;
}
@@ -254,22 +270,22 @@ private:
buffer_head cur_hd, new_hd;
do {
- if (buffer->m_head.load().head_idx == head) {
- cur_hd = buffer->m_head;
- assert(cur_hd.refcnt > 0);
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0) continue;
new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
break;
}
} else {
- cur_hd = buffer->m_old_head;
- assert(cur_hd.refcnt > 0);
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0) continue;
new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
break;
}
- }
+ }
_mm_pause();
} while(true);
}
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index ad5b767..c44f5d7 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -84,11 +84,11 @@ public:
* roll the pointer forward to the first record that is
* greater than or equal to the lower bound.
*/
- while(ptr->rec.key < p->lower_bound) {
+ while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) {
ptr++;
}
- while (ptr->rec.key <= p->upper_bound && ptr < shard->get_data() + s->stop_idx) {
+ while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) {
records.emplace_back(*ptr);
ptr++;
}
@@ -152,6 +152,7 @@ public:
} else {
auto& cursor = cursors[tmp_n - now.version - 1];
if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec);
+
pq.pop();
if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index 6b2f6b5..932e767 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -62,10 +62,13 @@ public:
{
TIMER_INIT();
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
TIMER_START();
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>));
+ auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
buffer.copy_to_buffer((byte *) temp_buffer);
auto base = temp_buffer;
@@ -99,6 +102,7 @@ public:
base++;
}
+
TIMER_STOP();
auto copy_time = TIMER_RESULT();
@@ -112,7 +116,7 @@ public:
free(temp_buffer);
}
- ISAMTree(ISAMTree** runs, size_t len)
+ ISAMTree(std::vector<ISAMTree*> &shards)
: m_bf(nullptr)
, m_isam_nodes(nullptr)
, m_root(nullptr)
@@ -124,19 +128,19 @@ public:
, m_data(nullptr)
{
std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(len);
+ cursors.reserve(shards.size());
- PriorityQueue<Wrapped<R>> pq(len);
+ PriorityQueue<Wrapped<R>> pq(shards.size());
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
- for (size_t i = 0; i < len; ++i) {
- if (runs[i]) {
- auto base = runs[i]->get_data();
- cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()});
- attemp_reccnt += runs[i]->get_record_count();
- tombstone_count += runs[i]->get_tombstone_count();
+ for (size_t i = 0; i < shards.size(); ++i) {
+ if (shards[i]) {
+ auto base = shards[i]->get_data();
+ cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
+ attemp_reccnt += shards[i]->get_record_count();
+ tombstone_count += shards[i]->get_tombstone_count();
pq.push(cursors[i].ptr, i);
} else {
cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
@@ -144,10 +148,9 @@ public:
}
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
-
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
while (pq.size()) {
auto now = pq.peek();
@@ -165,6 +168,8 @@ public:
if (!cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
if (cursor.ptr->is_tombstone()) {
+ //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n",
+ //now.version, next.version);
++m_tombstone_cnt;
m_bf->insert(cursor.ptr->rec);
}