diff options
Diffstat (limited to 'include/shard/TrieSpline.h')
| -rw-r--r-- | include/shard/TrieSpline.h | 149 |
1 files changed, 23 insertions, 126 deletions
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 9473177..f9fb3cb 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -15,11 +15,9 @@ #include "framework/ShardRequirements.h" #include "ts/builder.h" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" #include "psu-ds/BloomFilter.h" #include "util/bf_config.h" -#include "psu-util/timer.h" +#include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; @@ -45,78 +43,26 @@ public: , m_min_key(0) , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) { - TIMER_INIT(); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - TIMER_START(); - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); - buffer.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); - - K min_key = base->rec.key; - K max_key = (stop-1)->rec.key; - TIMER_STOP(); - - auto sort_time = TIMER_RESULT(); - - TIMER_START(); - auto bldr = ts::Builder<K>(min_key, max_key, E); - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - m_data[m_reccnt++] = *base; - bldr.AddKey(base->rec.key); - if (m_bf && base->is_tombstone()) { - ++m_tombstone_cnt; - m_bf->insert(base->rec); - } + if (m_reccnt > 0) { + m_min_key = m_data[0].rec.key; + m_max_key = m_data[m_reccnt-1].rec.key; - /* - * determine the "true" min/max keys based on the scan. This is - * to avoid situations where the min/max in the input array - * are deleted and don't survive into the structure itself. - */ - if (m_reccnt == 0) { - m_max_key = m_min_key = base->rec.key; - } else if (base->rec.key > m_max_key) { - m_max_key = base->rec.key; - } else if (base->rec.key < m_min_key) { - m_min_key = base->rec.key; + auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); + for (size_t i=0; i<m_reccnt; i++) { + bldr.AddKey(m_data[i].rec.key); } - base++; - } - - TIMER_STOP(); - auto copy_time = TIMER_RESULT(); - - TIMER_START(); - if (m_reccnt > 0) { m_ts = bldr.Finalize(); } - TIMER_STOP(); - auto level_time = TIMER_RESULT(); - - free(temp_buffer); } TrieSpline(std::vector<TrieSpline*> &shards) @@ -128,77 +74,28 @@ public: , m_min_key(0) , m_bf(nullptr) { - - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(shards.size()); - - PriorityQueue<Wrapped<R>> pq(shards.size()); - size_t attemp_reccnt = 0; size_t tombstone_count = 0; - - /* - * Initialize m_max_key and m_min_key using the values from the - * first shard. These will later be updated when building - * the initial priority queue to their true values. - */ - m_max_key = shards[0]->m_max_key; - m_min_key = shards[0]->m_min_key; + auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count); - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - attemp_reccnt += shards[i]->get_record_count(); - tombstone_count += shards[i]->get_tombstone_count(); - pq.push(cursors[i].ptr, i); - - if (shards[i]->m_max_key > m_max_key) { - m_max_key = shards[i]->m_max_key; - } - - if (shards[i]->m_min_key < m_min_key) { - m_min_key = shards[i]->m_min_key; - } - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } - } - m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - if (!cursor.ptr->is_deleted()) { - m_data[m_reccnt++] = *cursor.ptr; - bldr.AddKey(cursor.ptr->rec.key); - if (cursor.ptr->is_tombstone()) { - ++m_tombstone_cnt; - m_bf->insert(cursor.ptr->rec); - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { + m_min_key = m_data[0].rec.key; + m_max_key = m_data[m_reccnt-1].rec.key; + + auto bldr = ts::Builder<K>(m_min_key, m_max_key, E); + for (size_t i=0; i<m_reccnt; i++) { + bldr.AddKey(m_data[i].rec.key); + } + m_ts = bldr.Finalize(); } } @@ -250,7 +147,7 @@ public: } size_t get_aux_memory_usage() { - return 0; + return (m_bf) ? m_bf->memory_usage() : 0; } size_t get_lower_bound(const K& key) const { |