1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
|
/*
* include/framework/ExtensionStructure.h
*
* Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* All rights reserved. Published under the Modified BSD License.
*
*/
#pragma once
#include <atomic>
#include <numeric>
#include <cstdio>
#include <vector>
#include "framework/structure/MutableBuffer.h"
#include "framework/structure/InternalLevel.h"
#include "framework/interface/Shard.h"
#include "framework/interface/Query.h"
#include "framework/interface/Record.h"
#include "framework/util/Configuration.h"
#include "framework/scheduling/Task.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
class ExtensionStructure {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
public:
ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
: m_scale_factor(scale_factor)
, m_max_delete_prop(max_delete_prop)
, m_buffer_size(buffer_size)
{}
~ExtensionStructure() = default;
/*
* Create a shallow copy of this extension structure. The copy will share references to the
* same levels/shards as the original, but will have its own lists. As all of the shards are
* immutable (with the exception of deletes), the copy can be restructured with merges, etc.,
* without affecting the original.
*
* NOTE: When using tagged deletes, a delete of a record in the original structure will affect
* the copy, so long as the copy retains a reference to the same shard as the original. This could
* cause synchronization problems under tagging with concurrency. Any deletes in this context will
* need to be forwarded to the appropriate structures manually.
*/
ExtensionStructure<R, S, Q, L> *copy() {
auto new_struct = new ExtensionStructure<R, S, Q, L>(m_scale_factor, m_max_delete_prop, m_buffer_size);
for (size_t i=0; i<m_levels.size(); i++) {
new_struct->m_levels.push_back(m_levels[i]->clone());
}
return new_struct;
}
/*
* Search for a record matching the argument and mark it deleted by
* setting the delete bit in its wrapped header. Returns 1 if a matching
* record was found and deleted, and 0 if a matching record was not found.
*
* This function will stop after finding the first matching record. It is assumed
* that no duplicate records exist. In the case of duplicates, this function will
* still "work", but in the sense of "delete first match".
*/
int tagged_delete(const R &rec) {
for (auto level : m_levels) {
if (level && level->delete_record(rec)) {
return 1;
}
}
/*
* If the record to be erased wasn't found, return 0. The
* DynamicExtension itself will then search the active
* Buffers.
*/
return 0;
}
/*
* Merge the memory table down into the tree, completing any required other
* merges to make room for it.
*/
inline bool merge_buffer(Buffer *buffer) {
assert(can_merge_with(0, buffer->get_record_count()));
buffer->start_merge();
merge_buffer_into_l0(buffer);
buffer->finish_merge();
buffer->truncate();
return true;
}
/*
* Return the total number of records (including tombstones) within all
* of the levels of the structure.
*/
size_t get_record_count() {
size_t cnt = 0;
for (size_t i=0; i<m_levels.size(); i++) {
if (m_levels[i]) cnt += m_levels[i]->get_record_count();
}
return cnt;
}
/*
* Return the total number of tombstones contained within all of the
* levels of the structure.
*/
size_t get_tombstone_cnt() {
size_t cnt = 0;
for (size_t i=0; i<m_levels.size(); i++) {
if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
}
return cnt;
}
/*
* Return the number of levels within the structure. Note that not
* all of these levels are necessarily populated.
*/
size_t get_height() {
return m_levels.size();
}
/*
* Return the amount of memory (in bytes) used by the shards within the
* structure for storing the primary data structure and raw data.
*/
size_t get_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_levels.size(); i++) {
if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
}
return cnt;
}
/*
* Return the amount of memory (in bytes) used by the shards within the
* structure for storing auxiliary data structures. This total does not
* include memory used for the main data structure, or raw data.
*/
size_t get_aux_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_levels.size(); i++) {
if (m_levels[i]) {
cnt += m_levels[i]->get_aux_memory_usage();
}
}
return cnt;
}
/*
* Validate that no level in the structure exceeds its maximum tombstone capacity. This is
* used to trigger preemptive compactions at the end of the merge process.
*/
bool validate_tombstone_proportion() {
long double ts_prop;
for (size_t i=0; i<m_levels.size(); i++) {
if (m_levels[i]) {
ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i);
if (ts_prop > (long double) m_max_delete_prop) {
return false;
}
}
}
return true;
}
bool validate_tombstone_proportion(level_index level) {
long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level);
return ts_prop <= (long double) m_max_delete_prop;
}
/*
* Return a reference to the underlying vector of levels within the
* structure.
*/
std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() {
return m_levels;
}
/*
*
*/
std::vector<MergeTask> get_merge_tasks(size_t buffer_reccnt) {
std::vector<MergeTask> merges;
/*
* The buffer -> L0 merge task is not included so if that
* can be done without any other change, just return an
* empty list.
*/
if (can_merge_with(0, buffer_reccnt)) {
return std::move(merges);
}
level_index merge_base_level = find_mergable_level(0);
if (merge_base_level == -1) {
merge_base_level = grow();
}
for (level_index i=merge_base_level; i>0; i--) {
MergeTask task = {i-1, i};
/*
* The amount of storage required for the merge accounts
* for the cost of storing the new records, along with the
* cost of retaining the old records during the process
* (hence the 2x multiplier).
*
* FIXME: currently does not account for the *actual* size
* of the shards, only the storage for the records
* themselves.
*/
size_t reccnt = m_levels[i-1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
if (can_merge_with(i, reccnt)) {
reccnt += m_levels[i]->get_record_count();
}
}
//task.m_size = 2* reccnt * sizeof(R);
merges.push_back(task);
}
return std::move(merges);
}
/*
*
*/
std::vector<MergeTask> get_merge_tasks_from_level(level_index source_level) {
std::vector<MergeTask> merges;
level_index merge_base_level = find_mergable_level(source_level);
if (merge_base_level == -1) {
merge_base_level = grow();
}
for (level_index i=merge_base_level; i>source_level; i--) {
MergeTask task = {i - 1, i};
/*
* The amount of storage required for the merge accounts
* for the cost of storing the new records, along with the
* cost of retaining the old records during the process
* (hence the 2x multiplier).
*
* FIXME: currently does not account for the *actual* size
* of the shards, only the storage for the records
* themselves.
*/
size_t reccnt = m_levels[i-1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
if (can_merge_with(i, reccnt)) {
reccnt += m_levels[i]->get_record_count();
}
}
// task.m_size = 2* reccnt * sizeof(R);
merges.push_back(task);
}
return merges;
}
/*
* Merge the level specified by incoming level into the level specified
* by base level. The two levels should be sequential--i.e. no levels
* are skipped in the merge process--otherwise the tombstone ordering
* invariant may be violated by the merge operation.
*/
inline void merge_levels(level_index base_level, level_index incoming_level) {
// merging two memory levels
if constexpr (L == LayoutPolicy::LEVELING) {
auto tmp = m_levels[base_level];
m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
m_levels[base_level]->finalize();
}
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
}
private:
size_t m_scale_factor;
double m_max_delete_prop;
size_t m_buffer_size;
std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
/*
* Add a new level to the LSM Tree and return that level's index. Will
* automatically determine whether the level should be on memory or on disk,
* and act appropriately.
*/
inline level_index grow() {
level_index new_idx = m_levels.size();
size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
return new_idx;
}
/*
* Find the first level below the level indicated by idx that
* is capable of sustaining a merge operation and return its
* level index. If no such level exists, returns -1. Also
* returns -1 if idx==0, and no such level exists, to skimplify
* the logic of the first merge.
*/
inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
if (idx == 0 && m_levels.size() == 0) return -1;
bool level_found = false;
bool disk_level;
level_index merge_level_idx;
size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
for (level_index i=idx+1; i<m_levels.size(); i++) {
if (can_merge_with(i, incoming_rec_cnt)) {
return i;
}
incoming_rec_cnt = get_level_record_count(i);
}
return -1;
}
inline void merge_buffer_into_l0(Buffer *buffer) {
assert(m_levels[0]);
if constexpr (L == LayoutPolicy::LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0].get();
auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
temp_level->append_buffer(buffer);
auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
} else {
m_levels[0]->append_buffer(buffer);
}
}
/*
* Mark a given memory level as no-longer in use by the tree. For now this
* will just free the level. In future, this will be more complex as the
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) {
level.reset();
}
/*
* Assume that level "0" should be larger than the buffer. The buffer
* itself is index -1, which should return simply the buffer capacity.
*/
inline size_t calc_level_record_capacity(level_index idx) {
return m_buffer_size * pow(m_scale_factor, idx+1);
}
/*
* Returns the actual number of records present on a specified level. An
* index value of -1 indicates the memory table. Can optionally pass in
* a pointer to the memory table to use, if desired. Otherwise, there are
* no guarantees about which buffer will be accessed if level_index is -1.
*/
inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) {
if (buffer) {
return buffer->get_record_count();
}
return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
}
/*
* Determines if the specific level can merge with another record containing
* incoming_rec_cnt number of records. The provided level index should be
* non-negative (i.e., not refer to the buffer) and will be automatically
* translated into the appropriate index into either the disk or memory level
* vector.
*/
inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) {
if (idx>= m_levels.size() || !m_levels[idx]) {
return false;
}
if (L == LayoutPolicy::LEVELING) {
return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
} else {
return m_levels[idx]->get_shard_count() < m_scale_factor;
}
/* unreachable */
assert(true);
}
};
}
|