summaryrefslogtreecommitdiffstats
path: root/include/shard/ISAMTree.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:23:23 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:24:50 -0500
commitbd74e27b28bd95267ce50d2e4b6f12b51d9b6aae (patch)
tree8e40038feaa9c83c4da967ab78564c51fc67a653 /include/shard/ISAMTree.h
parent2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 (diff)
downloaddynamic-extension-bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae.tar.gz
Cleaned up shard files (except VPTree)
Cleaned up shard implementations, fixed a few bugs, and set up some tests. There's still some work to be done in creating tests for the weighted sampling operations for the alias and aug btree shards.
Diffstat (limited to 'include/shard/ISAMTree.h')
-rw-r--r--include/shard/ISAMTree.h105
1 files changed, 11 insertions, 94 deletions
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index 7de9cb1..33ba82f 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -17,9 +17,8 @@
#include "framework/ShardRequirements.h"
#include "util/bf_config.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
-#include "psu-util/timer.h"
+#include "psu-ds/BloomFilter.h"
+#include "util/SortedMerge.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
@@ -61,60 +60,18 @@ public:
, m_alloc_size(0)
, m_data(nullptr)
{
- TIMER_INIT();
-
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- TIMER_START();
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
- buffer.copy_to_buffer((byte *) temp_buffer);
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
- TIMER_STOP();
-
- auto sort_time = TIMER_RESULT();
-
- TIMER_START();
- while (base < stop) {
- if (!base->is_tombstone() && (base + 1 < stop)
- && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- if (m_bf && base->is_tombstone()) {
- ++m_tombstone_cnt;
- m_bf->insert(base->rec);
- }
-
- base++;
- }
-
- TIMER_STOP();
- auto copy_time = TIMER_RESULT();
+ auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- TIMER_START();
if (m_reccnt > 0) {
build_internal_levels();
}
- TIMER_STOP();
- auto level_time = TIMER_RESULT();
-
- free(temp_buffer);
}
ISAMTree(std::vector<ISAMTree*> &shards)
@@ -128,58 +85,18 @@ public:
, m_alloc_size(0)
, m_data(nullptr)
{
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(shards.size());
-
- PriorityQueue<Wrapped<R>> pq(shards.size());
-
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- for (size_t i = 0; i < shards.size(); ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
+ auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count);
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- if (cursor.ptr->is_tombstone()) {
- //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n",
- //now.version, next.version);
- ++m_tombstone_cnt;
- m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
build_internal_levels();
@@ -225,11 +142,11 @@ public:
size_t get_memory_usage() {
- return m_alloc_size;
+ return m_alloc_size + m_internal_node_cnt * NODE_SZ;
}
size_t get_aux_memory_usage() {
- return m_bf->memory_usage();
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
/* SortedShardInterface methods */