summaryrefslogtreecommitdiffstats
path: root/include/util/SortedMerge.h
blob: c41a7aea3cd199d7aeaec119e2300fac1049c85e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
/*
 * include/util/SortedMerge.h
 *
 * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
 *
 * Distributed under the Modified BSD License.
 *
 * A sorted array merge routine for use in Shard construction, as many
 * shards will use a sorted array to represent their data. Also encapsulates
 * the necessary tombstone-cancellation logic.
 *
 * FIXME: include generic per-record processing functionality for Shards that
 * need it, to avoid needing to reprocess the array in the shard after
 * creation.
 */
#pragma once

#include <algorithm>

#include "framework/interface/Shard.h"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"

namespace de {

using psudb::BloomFilter;
using psudb::byte;
using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;

/*
 * A simple struct to return record_count and tombstone_count information
 * back to the caller. Could've been an std::pair, but I like the more
 * explicit names.
 */
struct merge_info {
  size_t record_count;
  size_t tombstone_count;
};

/*
 * Build a vector of cursors corresponding to the records contained within
 * a vector of shards. The cursor at index i in the output will correspond
 * to the shard at index i in the input.
 *
 * The values of reccnt and tscnt will be updated with the sum of the
 * records contained within the shards. Note that these counts include deleted
 * records that may be removed during shard construction, and so constitute
 * upper bounds only.
 */
template <RecordInterface R, ShardInterface S>
static std::vector<Cursor<Wrapped<R>>>
build_cursor_vec(std::vector<const S *> const &shards, size_t *reccnt,
                 size_t *tscnt) {
  std::vector<Cursor<Wrapped<R>>> cursors;
  cursors.reserve(shards.size());

  *reccnt = 0;
  *tscnt = 0;

  for (size_t i = 0; i < shards.size(); ++i) {
    if (shards[i]) {
      auto base = shards[i]->get_data();
      cursors.emplace_back(
          Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0,
                             shards[i]->get_record_count()});
      *reccnt += shards[i]->get_record_count();
      *tscnt += shards[i]->get_tombstone_count();
    } else {
      cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
    }
  }

  return cursors;
}

/*
 * Build a sorted array of records based on the contents of a BufferView.
 * This routine does not alter the buffer view, but rather copies the
 * records out and then sorts them. The provided buffer must be large
 * enough to store the records from the BufferView, or the behavior of the
 * function is undefined.
 *
 * It allocates a temporary buffer for the sorting, and execution of the
 * program will be aborted if the allocation fails.
 */
template <RecordInterface R>
static merge_info
sorted_array_from_bufferview(BufferView<R> bv, Wrapped<R> *buffer,
                             psudb::BloomFilter<R> *bf = nullptr) {
  /*
   * Copy the contents of the buffer view into a temporary buffer, and
   * sort them. We still need to iterate over these temporary records to
   * apply tombstone/deleted record filtering, as well as any possible
   * per-record processing that is required by the shard being built.
   */
  auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc(
      CACHELINE_SIZE, bv.get_record_count(), sizeof(Wrapped<R>));
  bv.copy_to_buffer((byte *)temp_buffer);

  auto base = temp_buffer;
  auto stop = base + bv.get_record_count();
  std::sort(base, stop, std::less<Wrapped<R>>());

  merge_info info = {0, 0};

  /*
   * Iterate over the temporary buffer to process the records, copying
   * them into buffer as needed
   */
  while (base < stop) {
    if (!base->is_tombstone() && (base + 1 < stop) &&
        base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
      base += 2;
      continue;
    } else if (base->is_deleted()) {
      base += 1;
      continue;
    }

    // FIXME: this shouldn't be necessary, but the tagged record
    // bypass doesn't seem to be working on this code-path, so this
    // ensures that tagged records from the buffer are able to be
    // dropped, eventually. It should only need to be &= 1
    base->header &= 3;
    buffer[info.record_count++] = *base;

    if (base->is_tombstone()) {
      info.tombstone_count++;
      if (bf) {
        bf->insert(base->rec);
      }
    }

    base++;
  }

  free(temp_buffer);
  return info;
}

/*
 * Perform a sorted merge of the records within cursors into the provided
 * buffer. Includes tombstone and tagged delete cancellation logic, and
 * will insert tombstones into a bloom filter, if one is provided.
 *
 * The behavior of this function is undefined if the provided buffer does
 * not have space to contain all of the records within the input cursors.
 */
template <RecordInterface R>
static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors,
                                     Wrapped<R> *buffer,
                                     psudb::BloomFilter<R> *bf = nullptr) {

  // FIXME: For smaller cursor arrays, it may be more efficient to skip
  //        the priority queue and just do a scan.
  PriorityQueue<Wrapped<R>> pq(cursors.size());
  for (size_t i = 0; i < cursors.size(); i++) {
    pq.push(cursors[i].ptr, i);
  }

  merge_info info = {0, 0};
  while (pq.size()) {
    auto now = pq.peek();
    auto next =
        pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
    /*
     * if the current record is not a tombstone, and the next record is
     * a tombstone that matches the current one, then the current one
     * has been deleted, and both it and its tombstone can be skipped
     * over.
     */
    if (!now.data->is_tombstone() && next.data != nullptr &&
        now.data->rec == next.data->rec && next.data->is_tombstone()) {

      pq.pop();
      pq.pop();
      auto &cursor1 = cursors[now.version];
      auto &cursor2 = cursors[next.version];
      if (advance_cursor(cursor1))
        pq.push(cursor1.ptr, now.version);
      if (advance_cursor(cursor2))
        pq.push(cursor2.ptr, next.version);
    } else {
      auto &cursor = cursors[now.version];
      /* skip over records that have been deleted via tagging */
      if (!cursor.ptr->is_deleted()) {
        buffer[info.record_count++] = *cursor.ptr;

        /*
         * if the record is a tombstone, increment the ts count and
         * insert it into the bloom filter if one has been
         * provided.
         */
        if (cursor.ptr->is_tombstone()) {
          info.tombstone_count++;
          if (bf) {
            bf->insert(cursor.ptr->rec);
          }
        }
      }
      pq.pop();

      if (advance_cursor(cursor))
        pq.push(cursor.ptr, now.version);
    }
  }

  return info;
}

} // namespace de