summaryrefslogtreecommitdiffstats
path: root/include/shard
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-17 18:22:00 -0500
commit138c793b0a58577713d98c98bb140cf1d9c79bee (patch)
tree921197e2ba521704cb379ac8069189e70f8dee3d /include/shard
parent2117935e85412f3733ee0bcb1830c7fd0b129b29 (diff)
downloaddynamic-extension-138c793b0a58577713d98c98bb140cf1d9c79bee.tar.gz
Multiple concurrency bug fixes
A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test.
Diffstat (limited to 'include/shard')
-rw-r--r--include/shard/ISAMTree.h35
1 files changed, 20 insertions, 15 deletions
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index 6b2f6b5..932e767 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -62,10 +62,13 @@ public:
{
TIMER_INIT();
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
TIMER_START();
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>));
+ auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
buffer.copy_to_buffer((byte *) temp_buffer);
auto base = temp_buffer;
@@ -99,6 +102,7 @@ public:
base++;
}
+
TIMER_STOP();
auto copy_time = TIMER_RESULT();
@@ -112,7 +116,7 @@ public:
free(temp_buffer);
}
- ISAMTree(ISAMTree** runs, size_t len)
+ ISAMTree(std::vector<ISAMTree*> &shards)
: m_bf(nullptr)
, m_isam_nodes(nullptr)
, m_root(nullptr)
@@ -124,19 +128,19 @@ public:
, m_data(nullptr)
{
std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(len);
+ cursors.reserve(shards.size());
- PriorityQueue<Wrapped<R>> pq(len);
+ PriorityQueue<Wrapped<R>> pq(shards.size());
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
- for (size_t i = 0; i < len; ++i) {
- if (runs[i]) {
- auto base = runs[i]->get_data();
- cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()});
- attemp_reccnt += runs[i]->get_record_count();
- tombstone_count += runs[i]->get_tombstone_count();
+ for (size_t i = 0; i < shards.size(); ++i) {
+ if (shards[i]) {
+ auto base = shards[i]->get_data();
+ cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
+ attemp_reccnt += shards[i]->get_record_count();
+ tombstone_count += shards[i]->get_tombstone_count();
pq.push(cursors[i].ptr, i);
} else {
cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
@@ -144,10 +148,9 @@ public:
}
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
-
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
while (pq.size()) {
auto now = pq.peek();
@@ -165,6 +168,8 @@ public:
if (!cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
if (cursor.ptr->is_tombstone()) {
+ //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n",
+ //now.version, next.version);
++m_tombstone_cnt;
m_bf->insert(cursor.ptr->rec);
}