1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
/*
* include/util/SortedMerge.h
*
* Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
* A sorted array merge routine for use in Shard construction, as many
* shards will use a sorted array to represent their data. Also encapsulates
* the necessary tombstone-cancellation logic.
*
* FIXME: include generic per-record processing functionality for Shards that
* need it, to avoid needing to reprocess the array in the shard after
* creation.
*/
#pragma once
#include "util/Cursor.h"
#include "framework/interface/Shard.h"
#include "psu-ds/PriorityQueue.h"
namespace de {
using psudb::PriorityQueue;
using psudb::BloomFilter;
using psudb::queue_record;
using psudb::byte;
using psudb::CACHELINE_SIZE;
struct merge_info {
size_t record_count;
size_t tombstone_count;
};
template <RecordInterface R, ShardInterface<R> S>
static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, size_t *reccnt, size_t *tscnt) {
std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(shards.size());
*reccnt = 0;
*tscnt = 0;
for (size_t i = 0; i < shards.size(); ++i) {
if (shards[i]) {
auto base = shards[i]->get_data();
cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
*reccnt += shards[i]->get_record_count();
*tscnt += shards[i]->get_tombstone_count();
} else {
cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
}
}
return cursors;
}
/*
*
*/
template <RecordInterface R>
static merge_info sorted_array_from_bufferview(BufferView<R> bv,
Wrapped<R> *buffer,
psudb::BloomFilter<R> *bf=nullptr) {
/*
* Copy the contents of the buffer view into a temporary buffer, and
* sort them. We still need to iterate over these temporary records to
* apply tombstone/deleted record filtering, as well as any possible
* per-record processing that is required by the shard being built.
*/
auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
bv.get_record_count(),
sizeof(Wrapped<R>));
bv.copy_to_buffer((byte *) temp_buffer);
auto base = temp_buffer;
auto stop = base + bv.get_record_count();
std::sort(base, stop, std::less<Wrapped<R>>());
merge_info info = {0, 0};
/*
* Iterate over the temporary buffer to process the records, copying
* them into buffer as needed
*/
while (base < stop) {
if (!base->is_tombstone() && (base + 1 < stop)
&& base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
base += 2;
continue;
} else if (base->is_deleted()) {
base += 1;
continue;
}
// fixme: this shouldn't be necessary, but the tagged record
// bypass doesn't seem to be working on this code-path, so this
// ensures that tagged records from the buffer are able to be
// dropped, eventually. it should only need to be &= 1
base->header &= 3;
buffer[info.record_count++] = *base;
if (base->is_tombstone()) {
info.tombstone_count++;
if (bf){
bf->insert(base->rec);
}
}
base++;
}
free(temp_buffer);
return info;
}
/*
* Perform a sorted merge of the records within cursors into the provided
* buffer. Includes tombstone and tagged delete cancellation logic, and
* will insert tombstones into a bloom filter, if one is provided.
*
* The behavior of this function is undefined if the provided buffer does
* not have space to contain all of the records within the input cursors.
*/
template <RecordInterface R>
static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors,
Wrapped<R> *buffer,
psudb::BloomFilter<R> *bf=nullptr) {
// FIXME: For smaller cursor arrays, it may be more efficient to skip
// the priority queue and just do a scan.
PriorityQueue<Wrapped<R>> pq(cursors.size());
for (size_t i=0; i<cursors.size(); i++) {
pq.push(cursors[i].ptr, i);
}
merge_info info = {0, 0};
while (pq.size()) {
auto now = pq.peek();
auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
/*
* if the current record is not a tombstone, and the next record is
* a tombstone that matches the current one, then the current one
* has been deleted, and both it and its tombstone can be skipped
* over.
*/
if (!now.data->is_tombstone() && next.data != nullptr &&
now.data->rec == next.data->rec && next.data->is_tombstone()) {
pq.pop(); pq.pop();
auto& cursor1 = cursors[now.version];
auto& cursor2 = cursors[next.version];
if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
} else {
auto& cursor = cursors[now.version];
/* skip over records that have been deleted via tagging */
if (!cursor.ptr->is_deleted()) {
buffer[info.record_count++] = *cursor.ptr;
/*
* if the record is a tombstone, increment the ts count and
* insert it into the bloom filter if one has been
* provided.
*/
if (cursor.ptr->is_tombstone()) {
info.tombstone_count++;
if (bf) {
bf->insert(cursor.ptr->rec);
}
}
}
pq.pop();
if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
}
}
return info;
}
}
|