diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-02-07 17:23:23 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-02-07 17:24:50 -0500 |
| commit | bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae (patch) | |
| tree | 8e40038feaa9c83c4da967ab78564c51fc67a653 /include/shard/ISAMTree.h | |
| parent | 2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 (diff) | |
| download | dynamic-extension-bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae.tar.gz | |
Cleaned up shard files (except VPTree)
Cleaned up shard implementations, fixed a few bugs, and set up some
tests. There's still some work to be done in creating tests for the
weighted sampling operations for the alias and aug btree shards.
Diffstat (limited to 'include/shard/ISAMTree.h')
| -rw-r--r-- | include/shard/ISAMTree.h | 105 |
1 files changed, 11 insertions, 94 deletions
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 7de9cb1..33ba82f 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -17,9 +17,8 @@ #include "framework/ShardRequirements.h" #include "util/bf_config.h" -#include "psu-ds/PriorityQueue.h" -#include "util/Cursor.h" -#include "psu-util/timer.h" +#include "psu-ds/BloomFilter.h" +#include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; using psudb::BloomFilter; @@ -61,60 +60,18 @@ public: , m_alloc_size(0) , m_data(nullptr) { - TIMER_INIT(); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>), (byte**) &m_data); - TIMER_START(); - auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>)); - buffer.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less<Wrapped<R>>()); - TIMER_STOP(); - - auto sort_time = TIMER_RESULT(); - - TIMER_START(); - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - m_data[m_reccnt++] = *base; - if (m_bf && base->is_tombstone()) { - ++m_tombstone_cnt; - m_bf->insert(base->rec); - } - - base++; - } - - TIMER_STOP(); - auto copy_time = TIMER_RESULT(); + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - TIMER_START(); if (m_reccnt > 0) { build_internal_levels(); } - TIMER_STOP(); - auto level_time = TIMER_RESULT(); - - free(temp_buffer); } ISAMTree(std::vector<ISAMTree*> &shards) @@ -128,58 +85,18 @@ public: , m_alloc_size(0) , m_data(nullptr) { - std::vector<Cursor<Wrapped<R>>> cursors; - cursors.reserve(shards.size()); - - PriorityQueue<Wrapped<R>> pq(shards.size()); - size_t attemp_reccnt = 0; size_t tombstone_count = 0; - - for (size_t i = 0; i < shards.size(); ++i) { - if (shards[i]) { - auto base = shards[i]->get_data(); - cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); - attemp_reccnt += shards[i]->get_record_count(); - tombstone_count += shards[i]->get_tombstone_count(); - pq.push(cursors[i].ptr, i); - } else { - cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); - } - } + auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count); m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **) &m_data); - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - if (!cursor.ptr->is_deleted()) { - m_data[m_reccnt++] = *cursor.ptr; - if (cursor.ptr->is_tombstone()) { - //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n", - //now.version, next.version); - ++m_tombstone_cnt; - m_bf->insert(cursor.ptr->rec); - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + auto res = sorted_array_merge<R>(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; if (m_reccnt > 0) { build_internal_levels(); @@ -225,11 +142,11 @@ public: size_t get_memory_usage() { - return m_alloc_size; + return m_alloc_size + m_internal_node_cnt * NODE_SZ; } size_t get_aux_memory_usage() { - return m_bf->memory_usage(); + return (m_bf) ? m_bf->memory_usage() : 0; } /* SortedShardInterface methods */ |