diff options
Diffstat (limited to 'include/shard/ISAMTree.h')
| -rw-r--r-- | include/shard/ISAMTree.h | 105 |
1 files changed, 11 insertions, 94 deletions
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 7de9cb1..33ba82f 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -17,9 +17,8 @@ #include "framework/ShardRequirements.h" #include "util/bf_config.h" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" -#include "psu-util/timer.h" +#include "psu-ds/BloomFilter.h" +#include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; @@ -61,60 +60,18 @@ public: , m_alloc_size(0) , m_data(nullptr) { - TIMER_INIT(); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - TIMER_START(); - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); - buffer.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); - TIMER_STOP(); - - auto sort_time = TIMER_RESULT(); - - TIMER_START(); - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - m_data[m_reccnt++] = *base; - if (m_bf && base->is_tombstone()) { - ++m_tombstone_cnt; - m_bf->insert(base->rec); - } - - base++; - } - - TIMER_STOP(); - auto copy_time = TIMER_RESULT(); + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - TIMER_START(); if (m_reccnt > 0) { build_internal_levels(); } - TIMER_STOP(); - auto level_time = TIMER_RESULT(); - - free(temp_buffer); } ISAMTree(std::vector<ISAMTree*> &shards) @@ -128,58 +85,18 @@ public: , m_alloc_size(0) , m_data(nullptr) { - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(shards.size()); - - PriorityQueue<Wrapped<R>> pq(shards.size()); - size_t attemp_reccnt = 0; size_t tombstone_count = 0; - - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - attemp_reccnt += shards[i]->get_record_count(); - tombstone_count += shards[i]->get_tombstone_count(); - pq.push(cursors[i].ptr, i); - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } - } + auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count); m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - if (!cursor.ptr->is_deleted()) { - m_data[m_reccnt++] = *cursor.ptr; - if (cursor.ptr->is_tombstone()) { - //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n", - //now.version, next.version); - ++m_tombstone_cnt; - m_bf->insert(cursor.ptr->rec); - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { build_internal_levels(); @@ -225,11 +142,11 @@ public: size_t get_memory_usage() { - return m_alloc_size; + return m_alloc_size + m_internal_node_cnt * NODE_SZ; } size_t get_aux_memory_usage() { - return m_bf->memory_usage(); + return (m_bf) ? m_bf->memory_usage() : 0; } /* SortedShardInterface methods */ |