summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:23:23 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-02-07 17:24:50 -0500
commitbd74e27b28bd95267ce50d2e4b6f12b51d9b6aae (patch)
tree8e40038feaa9c83c4da967ab78564c51fc67a653
parent2c5d549b3618b9ea72e6eece4cb4f3da5a6811a8 (diff)
downloaddynamic-extension-bd74e27b28bd95267ce50d2e4b6f12b51d9b6aae.tar.gz
Cleaned up shard files (except VPTree)
Cleaned up shard implementations, fixed a few bugs, and set up some tests. There's still some work to be done in creating tests for the weighted sampling operations for the alias and aug btree shards.
-rw-r--r--CMakeLists.txt29
-rw-r--r--include/query/wss.h2
-rw-r--r--include/shard/Alias.h155
-rw-r--r--include/shard/AugBTree.h150
-rw-r--r--include/shard/ISAMTree.h105
-rw-r--r--include/shard/PGM.h140
-rw-r--r--include/shard/TrieSpline.h149
-rw-r--r--include/util/SortedMerge.h185
-rw-r--r--tests/alias_tests.cpp352
-rw-r--r--tests/augbtree_tests.cpp360
-rw-r--r--tests/de_level_concurrent.cpp4
-rw-r--r--tests/de_level_tag.cpp3
-rw-r--r--tests/de_level_tomb.cpp3
-rw-r--r--tests/de_tier_concurrent.cpp3
-rw-r--r--tests/de_tier_tag.cpp3
-rw-r--r--tests/de_tier_tomb.cpp3
-rw-r--r--tests/include/concurrent_extension.h40
-rw-r--r--tests/include/dynamic_extension.h36
-rw-r--r--tests/include/rangecount.h57
-rw-r--r--tests/include/rangequery.h60
-rw-r--r--tests/include/shard_standard.h26
-rw-r--r--tests/include/wirs.h181
-rw-r--r--tests/include/wss.h144
-rw-r--r--tests/memisam_tests.cpp8
-rw-r--r--tests/pgm_tests.cpp325
-rw-r--r--tests/rangecount_tests.cpp1
-rw-r--r--tests/rangequery_tests.cpp3
-rw-r--r--tests/triespline_tests.cpp6
28 files changed, 859 insertions, 1674 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e7426b8..5f77396 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug false)
+set(debug true)
set(tests True)
set(bench true)
set(old_bench False)
@@ -33,9 +33,10 @@ if (tests)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin/tests")
file(MAKE_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/tests/data")
- #add_executable(augbtree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/augbtree_tests.cpp)
- #target_link_libraries(augbtree_tests PUBLIC gsl check subunit pthread)
- #target_include_directories(augbtree_tests PRIVATE include external/psudb-common/cpp/include external/ctpl)
+ add_executable(augbtree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/augbtree_tests.cpp)
+ target_link_libraries(augbtree_tests PUBLIC gsl check subunit pthread atomic)
+ target_link_options(augbtree_tests PUBLIC -mcx16)
+ target_include_directories(augbtree_tests PRIVATE include external/psudb-common/cpp/include external/ctpl)
add_executable(internal_level_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/internal_level_tests.cpp)
target_link_libraries(internal_level_tests PUBLIC gsl check subunit pthread atomic)
@@ -103,18 +104,16 @@ if (tests)
target_link_options(triespline_tests PUBLIC -mcx16)
target_include_directories(triespline_tests PRIVATE include external/psudb-common/cpp/include external/PLEX/include)
- #add_executable(alias_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/alias_tests.cpp)
- #target_link_libraries(alias_tests PUBLIC gsl check subunit pthread)
- #target_include_directories(alias_tests PRIVATE include external/psudb-common/cpp/include)
+ add_executable(alias_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/alias_tests.cpp)
+ target_link_libraries(alias_tests PUBLIC gsl check subunit pthread atomic)
+ target_link_options(alias_tests PUBLIC -mcx16)
+ target_include_directories(alias_tests PRIVATE include external/psudb-common/cpp/include)
- #add_executable(triespline_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_tests.cpp)
- #target_link_libraries(triespline_tests PUBLIC gsl check subunit pthread)
- #target_include_directories(triespline_tests PRIVATE include external/PLEX/include external/psudb-common/cpp/include)
-
- #add_executable(pgm_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/pgm_tests.cpp)
- #target_link_libraries(pgm_tests PUBLIC gsl check subunit pthread gomp)
- #target_include_directories(pgm_tests PRIVATE include external/PGM-index/include external/psudb-common/cpp/include)
- #target_compile_options(pgm_tests PUBLIC -fopenmp)
+ add_executable(pgm_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/pgm_tests.cpp)
+ target_link_libraries(pgm_tests PUBLIC gsl check subunit pthread gomp atomic)
+ target_include_directories(pgm_tests PRIVATE include external/PGM-index/include external/psudb-common/cpp/include)
+ target_link_options(pgm_tests PUBLIC -mcx16)
+ target_compile_options(pgm_tests PUBLIC -fopenmp)
endif()
if (bench)
diff --git a/include/query/wss.h b/include/query/wss.h
index ea36cb2..8797035 100644
--- a/include/query/wss.h
+++ b/include/query/wss.h
@@ -60,7 +60,7 @@ public:
return res;
}
- static void* get_buffer_query_state(BufferState<R> *buffer, void *parms) {
+ static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
BufferState<R> *state = new BufferState<R>();
auto parameters = (Parms<R>*) parms;
if constexpr (Rejection) {
diff --git a/include/shard/Alias.h b/include/shard/Alias.h
index a234575..f0d1d59 100644
--- a/include/shard/Alias.h
+++ b/include/shard/Alias.h
@@ -14,20 +14,19 @@
#pragma once
#include <vector>
-#include <cassert>
#include "framework/ShardRequirements.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
+#include "util/SortedMerge.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
using psudb::PriorityQueue;
using psudb::queue_record;
+using psudb::byte;
namespace de {
@@ -41,126 +40,73 @@ private:
typedef decltype(R::weight) W;
public:
- Alias(BufferView<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_alias(nullptr), m_bf(nullptr) {
-
- m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
-
- size_t offset = 0;
- m_reccnt = 0;
- auto base = buffer->get_data();
- auto stop = base + buffer->get_record_count();
-
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- std::vector<W> weights;
-
- while (base < stop) {
- if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- wss_cancelations++;
- continue;
- }
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
+ Alias(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_alias(nullptr)
+ , m_total_weight(0)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_alloc_size(0)
+ , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) {
+
+
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
+
+ auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- m_total_weight+= base->rec.weight;
- weights.push_back(base->rec.weight);
-
- if (m_bf && base->is_tombstone()) {
- m_tombstone_cnt++;
- m_bf->insert(base->rec);
+ if (m_reccnt > 0) {
+ std::vector<W> weights;
+ for (size_t i=0; i<m_reccnt; i++) {
+ weights.emplace_back(m_data[i].rec.weight);
+ m_total_weight += m_data[i].rec.weight;
}
-
- base++;
- }
- if (m_reccnt > 0) {
build_alias_structure(weights);
}
}
Alias(std::vector<Alias*> &shards)
- : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_alias(nullptr), m_bf(nullptr) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(shards.size());
-
- PriorityQueue<Wrapped<R>> pq(shards.size());
+ : m_data(nullptr)
+ , m_alias(nullptr)
+ , m_total_weight(0)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_alloc_size(0)
+ , m_bf(nullptr) {
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- for (size_t i = 0; i < shards.size(); ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
+ auto cursors = build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count);
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- std::vector<W> weights;
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- m_total_weight += cursor.ptr->rec.weight;
- weights.push_back(cursor.ptr->rec.weight);
- if (m_bf && cursor.ptr->is_tombstone()) {
- ++m_tombstone_cnt;
- if (m_bf) m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
+ std::vector<W> weights;
+ for (size_t i=0; i<m_reccnt; i++) {
+ weights.emplace_back(m_data[i].rec.weight);
+ m_total_weight += m_data[i].rec.weight;
+ }
+
build_alias_structure(weights);
}
}
~Alias() {
- if (m_data) free(m_data);
- if (m_alias) delete m_alias;
- if (m_bf) delete m_bf;
-
+ free(m_data);
+ delete m_alias;
+ delete m_bf;
}
Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
@@ -173,7 +119,7 @@ public:
return nullptr;
}
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
+ while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx;
if (m_data[idx].rec == rec) {
return m_data + idx;
@@ -205,7 +151,7 @@ public:
}
size_t get_aux_memory_usage() {
- return 0;
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
W get_total_weight() {
@@ -254,7 +200,6 @@ private:
W m_total_weight;
size_t m_reccnt;
size_t m_tombstone_cnt;
- size_t m_group_size;
size_t m_alloc_size;
BloomFilter<R> *m_bf;
};
diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h
index be664ac..58bd098 100644
--- a/include/shard/AugBTree.h
+++ b/include/shard/AugBTree.h
@@ -16,28 +16,21 @@
#include <vector>
#include <cassert>
-#include <queue>
-#include <memory>
-#include <concepts>
#include "framework/ShardRequirements.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
+#include "util/SortedMerge.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
-using psudb::PriorityQueue;
-using psudb::queue_record;
using psudb::Alias;
+using psudb::byte;
namespace de {
-thread_local size_t wirs_cancelations = 0;
-
template <WeightedRecordInterface R>
struct AugBTreeNode {
struct AugBTreeNode<R> *left, *right;
@@ -54,108 +47,52 @@ private:
typedef decltype(R::weight) W;
public:
- AugBTree(MutableBuffer<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
- m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
-
- size_t offset = 0;
- m_reccnt = 0;
- auto base = buffer->get_data();
- auto stop = base + buffer->get_record_count();
-
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- while (base < stop) {
- if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- wirs_cancelations++;
- continue;
- }
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- m_total_weight+= base->rec.weight;
-
- if (m_bf && base->is_tombstone()) {
- m_tombstone_cnt++;
- m_bf->insert(base->rec);
- }
-
- base++;
- }
+ AugBTree(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_root(nullptr)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_group_size(0)
+ , m_alloc_size(0)
+ , m_node_cnt(0)
+ , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ {
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
+
+ auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
build_wirs_structure();
}
}
- AugBTree(AugBTree** shards, size_t len)
- : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(len);
-
- PriorityQueue<Wrapped<R>> pq(len);
-
+ AugBTree(std::vector<AugBTree*> shards)
+ : m_data(nullptr)
+ , m_root(nullptr)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_group_size(0)
+ , m_alloc_size(0)
+ , m_node_cnt(0)
+ , m_bf(nullptr)
+ {
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- for (size_t i = 0; i < len; ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
+ auto cursors = build_cursor_vec<R, AugBTree>(shards, &attemp_reccnt, &tombstone_count);
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- m_total_weight += cursor.ptr->rec.weight;
- if (m_bf && cursor.ptr->is_tombstone()) {
- ++m_tombstone_cnt;
- if (m_bf) m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
build_wirs_structure();
@@ -163,13 +100,12 @@ public:
}
~AugBTree() {
- if (m_data) free(m_data);
+ free(m_data);
for (size_t i=0; i<m_alias.size(); i++) {
- if (m_alias[i]) delete m_alias[i];
+ delete m_alias[i];
}
- if (m_bf) delete m_bf;
-
+ delete m_bf;
free_tree(m_root);
}
@@ -183,7 +119,7 @@ public:
return nullptr;
}
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
+ while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx;
if (m_data[idx].rec == rec) {
return m_data + idx;
@@ -209,13 +145,12 @@ public:
return m_data + idx;
}
-
size_t get_memory_usage() {
return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>);
}
size_t get_aux_memory_usage() {
- return 0;
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
size_t get_lower_bound(const K& key) const {
@@ -364,7 +299,6 @@ private:
Wrapped<R>* m_data;
std::vector<Alias *> m_alias;
AugBTreeNode<R>* m_root;
- W m_total_weight;
size_t m_reccnt;
size_t m_tombstone_cnt;
size_t m_group_size;
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index 7de9cb1..33ba82f 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -17,9 +17,8 @@
#include "framework/ShardRequirements.h"
#include "util/bf_config.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
-#include "psu-util/timer.h"
+#include "psu-ds/BloomFilter.h"
+#include "util/SortedMerge.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
@@ -61,60 +60,18 @@ public:
, m_alloc_size(0)
, m_data(nullptr)
{
- TIMER_INIT();
-
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- TIMER_START();
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
- buffer.copy_to_buffer((byte *) temp_buffer);
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
- TIMER_STOP();
-
- auto sort_time = TIMER_RESULT();
-
- TIMER_START();
- while (base < stop) {
- if (!base->is_tombstone() && (base + 1 < stop)
- && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- if (m_bf && base->is_tombstone()) {
- ++m_tombstone_cnt;
- m_bf->insert(base->rec);
- }
-
- base++;
- }
-
- TIMER_STOP();
- auto copy_time = TIMER_RESULT();
+ auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- TIMER_START();
if (m_reccnt > 0) {
build_internal_levels();
}
- TIMER_STOP();
- auto level_time = TIMER_RESULT();
-
- free(temp_buffer);
}
ISAMTree(std::vector<ISAMTree*> &shards)
@@ -128,58 +85,18 @@ public:
, m_alloc_size(0)
, m_data(nullptr)
{
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(shards.size());
-
- PriorityQueue<Wrapped<R>> pq(shards.size());
-
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- for (size_t i = 0; i < shards.size(); ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
+ auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count);
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- if (cursor.ptr->is_tombstone()) {
- //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n",
- //now.version, next.version);
- ++m_tombstone_cnt;
- m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
build_internal_levels();
@@ -225,11 +142,11 @@ public:
size_t get_memory_usage() {
- return m_alloc_size;
+ return m_alloc_size + m_internal_node_cnt * NODE_SZ;
}
size_t get_aux_memory_usage() {
- return m_bf->memory_usage();
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
/* SortedShardInterface methods */
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 13db26a..8031870 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -14,24 +14,19 @@
#include <vector>
-#include <cassert>
-#include <queue>
-#include <memory>
-#include <concepts>
#include "framework/ShardRequirements.h"
#include "pgm/pgm_index.hpp"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
+#include "util/SortedMerge.h"
#include "util/bf_config.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::Alias;
+using psudb::byte;
namespace de {
@@ -41,111 +36,65 @@ private:
typedef decltype(R::key) K;
typedef decltype(R::value) V;
-
public:
- PGM(MutableBuffer<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0) {
-
- m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
- std::vector<K> keys;
-
- size_t offset = 0;
- m_reccnt = 0;
- auto base = buffer->get_data();
- auto stop = base + buffer->get_record_count();
+ PGM(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_alloc_size(0) {
+
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
+ auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- K min_key = base->rec.key;
- K max_key = (stop - 1)->rec.key;
-
- while (base < stop) {
- if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- }
- } else if (base->is_deleted()) {
- base += 1;
- continue;
+ if (m_reccnt > 0) {
+ std::vector<K> keys;
+ for (size_t i=0; i<m_reccnt; i++) {
+ keys.emplace_back(m_data[i].rec.key);
}
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- keys.emplace_back(base->rec.key);
- base++;
- }
-
- if (m_reccnt > 0) {
m_pgm = pgm::PGMIndex<K, epsilon>(keys);
}
}
- PGM(PGM** shards, size_t len)
- : m_reccnt(0), m_tombstone_cnt(0) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(len);
-
- PriorityQueue<Wrapped<R>> pq(len);
-
+ PGM(std::vector<PGM*> shards)
+ : m_data(nullptr)
+ , m_bf(nullptr)
+ , m_reccnt(0)
+ , m_tombstone_cnt(0)
+ , m_alloc_size(0) {
+
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- for (size_t i = 0; i < len; ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
-
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
+ auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count);
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- std::vector<K> keys;
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- keys.emplace_back(cursor.ptr->rec.key);
- }
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
+
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
+ std::vector<K> keys;
+ for (size_t i=0; i<m_reccnt; i++) {
+ keys.emplace_back(m_data[i].rec.key);
+ }
+
m_pgm = pgm::PGMIndex<K, epsilon>(keys);
}
}
~PGM() {
- if (m_data) free(m_data);
+ free(m_data);
+ delete m_bf;
}
Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
@@ -186,7 +135,7 @@ public:
}
size_t get_aux_memory_usage() {
- return 0;
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
size_t get_lower_bound(const K& key) const {
@@ -228,6 +177,7 @@ public:
private:
Wrapped<R>* m_data;
+ BloomFilter<R> *m_bf;
size_t m_reccnt;
size_t m_tombstone_cnt;
size_t m_alloc_size;
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 9473177..f9fb3cb 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -15,11 +15,9 @@
#include "framework/ShardRequirements.h"
#include "ts/builder.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "psu-util/timer.h"
+#include "util/SortedMerge.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
@@ -45,78 +43,26 @@ public:
, m_min_key(0)
, m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
{
- TIMER_INIT();
-
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- TIMER_START();
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
- buffer.copy_to_buffer((byte *) temp_buffer);
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- K min_key = base->rec.key;
- K max_key = (stop-1)->rec.key;
- TIMER_STOP();
-
- auto sort_time = TIMER_RESULT();
-
- TIMER_START();
- auto bldr = ts::Builder<K>(min_key, max_key, E);
- while (base < stop) {
- if (!base->is_tombstone() && (base + 1 < stop)
- && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
+ auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- bldr.AddKey(base->rec.key);
- if (m_bf && base->is_tombstone()) {
- ++m_tombstone_cnt;
- m_bf->insert(base->rec);
- }
+ if (m_reccnt > 0) {
+ m_min_key = m_data[0].rec.key;
+ m_max_key = m_data[m_reccnt-1].rec.key;
- /*
- * determine the "true" min/max keys based on the scan. This is
- * to avoid situations where the min/max in the input array
- * are deleted and don't survive into the structure itself.
- */
- if (m_reccnt == 0) {
- m_max_key = m_min_key = base->rec.key;
- } else if (base->rec.key > m_max_key) {
- m_max_key = base->rec.key;
- } else if (base->rec.key < m_min_key) {
- m_min_key = base->rec.key;
+ auto bldr = ts::Builder<K>(m_min_key, m_max_key, E);
+ for (size_t i=0; i<m_reccnt; i++) {
+ bldr.AddKey(m_data[i].rec.key);
}
- base++;
- }
-
- TIMER_STOP();
- auto copy_time = TIMER_RESULT();
-
- TIMER_START();
- if (m_reccnt > 0) {
m_ts = bldr.Finalize();
}
- TIMER_STOP();
- auto level_time = TIMER_RESULT();
-
- free(temp_buffer);
}
TrieSpline(std::vector<TrieSpline*> &shards)
@@ -128,77 +74,28 @@ public:
, m_min_key(0)
, m_bf(nullptr)
{
-
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(shards.size());
-
- PriorityQueue<Wrapped<R>> pq(shards.size());
-
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
-
- /*
- * Initialize m_max_key and m_min_key using the values from the
- * first shard. These will later be updated when building
- * the initial priority queue to their true values.
- */
- m_max_key = shards[0]->m_max_key;
- m_min_key = shards[0]->m_min_key;
+ auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count);
- for (size_t i = 0; i < shards.size(); ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
-
- if (shards[i]->m_max_key > m_max_key) {
- m_max_key = shards[i]->m_max_key;
- }
-
- if (shards[i]->m_min_key < m_min_key) {
- m_min_key = shards[i]->m_min_key;
- }
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
-
m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
- auto bldr = ts::Builder<K>(m_min_key, m_max_key, E);
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- bldr.AddKey(cursor.ptr->rec.key);
- if (cursor.ptr->is_tombstone()) {
- ++m_tombstone_cnt;
- m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
if (m_reccnt > 0) {
+ m_min_key = m_data[0].rec.key;
+ m_max_key = m_data[m_reccnt-1].rec.key;
+
+ auto bldr = ts::Builder<K>(m_min_key, m_max_key, E);
+ for (size_t i=0; i<m_reccnt; i++) {
+ bldr.AddKey(m_data[i].rec.key);
+ }
+
m_ts = bldr.Finalize();
}
}
@@ -250,7 +147,7 @@ public:
}
size_t get_aux_memory_usage() {
- return 0;
+ return (m_bf) ? m_bf->memory_usage() : 0;
}
size_t get_lower_bound(const K& key) const {
diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h
new file mode 100644
index 0000000..ed47acb
--- /dev/null
+++ b/include/util/SortedMerge.h
@@ -0,0 +1,185 @@
+/*
+ * include/util/SortedMerge.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A sorted array merge routine for use in Shard construction, as many
+ * shards will use a sorted array to represent their data. Also encapsulates
+ * the necessary tombstone-cancellation logic.
+ *
+ * FIXME: include generic per-record processing functionality for Shards that
+ * need it, to avoid needing to reprocess the array in the shard after
+ * creation.
+ */
+#pragma once
+
+#include "util/Cursor.h"
+#include "framework/interface/Shard.h"
+#include "psu-ds/PriorityQueue.h"
+
+namespace de {
+
+using psudb::PriorityQueue;
+using psudb::BloomFilter;
+using psudb::queue_record;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
+
+struct merge_info {
+ size_t record_count;
+ size_t tombstone_count;
+};
+
+
+template <RecordInterface R, ShardInterface<R> S>
+static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards, size_t *reccnt, size_t *tscnt) {
+ std::vector<Cursor<Wrapped<R>>> cursors;
+ cursors.reserve(shards.size());
+
+ *reccnt = 0;
+ *tscnt = 0;
+
+ for (size_t i = 0; i < shards.size(); ++i) {
+ if (shards[i]) {
+ auto base = shards[i]->get_data();
+ cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
+ *reccnt += shards[i]->get_record_count();
+ *tscnt += shards[i]->get_tombstone_count();
+ } else {
+ cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
+ }
+ }
+
+ return cursors;
+}
+
+/*
+ *
+ */
+template <RecordInterface R>
+static merge_info sorted_array_from_bufferview(BufferView<R> bv,
+ Wrapped<R> *buffer,
+ psudb::BloomFilter<R> *bf=nullptr) {
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
+ bv.get_record_count(),
+ sizeof(Wrapped<R>));
+ bv.copy_to_buffer((byte *) temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + bv.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ merge_info info = {0, 0};
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop)
+ && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted()) {
+ base += 1;
+ continue;
+ }
+
+ // fixme: this shouldn't be necessary, but the tagged record
+ // bypass doesn't seem to be working on this code-path, so this
+ // ensures that tagged records from the buffer are able to be
+ // dropped, eventually. it should only need to be &= 1
+ base->header &= 3;
+ buffer[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (bf){
+ bf->insert(base->rec);
+ }
+ }
+
+ base++;
+ }
+
+ free(temp_buffer);
+ return info;
+}
+
+/*
+ * Perform a sorted merge of the records within cursors into the provided
+ * buffer. Includes tombstone and tagged delete cancellation logic, and
+ * will insert tombstones into a bloom filter, if one is provided.
+ *
+ * The behavior of this function is undefined if the provided buffer does
+ * not have space to contain all of the records within the input cursors.
+ */
+template <RecordInterface R>
+static merge_info sorted_array_merge(std::vector<Cursor<Wrapped<R>>> &cursors,
+ Wrapped<R> *buffer,
+ psudb::BloomFilter<R> *bf=nullptr) {
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ buffer[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (bf) {
+ bf->insert(cursor.ptr->rec);
+ }
+ }
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ }
+ }
+
+ return info;
+}
+
+
+
+}
diff --git a/tests/alias_tests.cpp b/tests/alias_tests.cpp
index e3c736b..98d0c63 100644
--- a/tests/alias_tests.cpp
+++ b/tests/alias_tests.cpp
@@ -12,358 +12,27 @@
#include "shard/Alias.h"
#include "query/wss.h"
-#include "testing.h"
+#include "framework/structure/MutableBuffer.h"
+#include "include/testing.h"
+
#include <check.h>
using namespace de;
-typedef Alias<WRec> Shard;
-
-START_TEST(t_mbuffer_init)
-{
- auto buffer = new MutableBuffer<WRec>(1024, 1024);
- for (uint64_t i = 512; i > 0; i--) {
- uint32_t v = i;
- buffer->append({i,v, 1});
- }
-
- for (uint64_t i = 1; i <= 256; ++i) {
- uint32_t v = i;
- buffer->append({i, v, 1}, true);
- }
-
- for (uint64_t i = 257; i <= 512; ++i) {
- uint32_t v = i + 1;
- buffer->append({i, v, 1});
- }
-
- Shard* shard = new Shard(buffer);
- ck_assert_uint_eq(shard->get_record_count(), 512);
-
- delete buffer;
- delete shard;
-}
-
-
-START_TEST(t_alias_init)
-{
- size_t n = 512;
- auto mbuffer1 = create_test_mbuffer<WRec>(n);
- auto mbuffer2 = create_test_mbuffer<WRec>(n);
- auto mbuffer3 = create_test_mbuffer<WRec>(n);
-
- auto shard1 = new Shard(mbuffer1);
- auto shard2 = new Shard(mbuffer2);
- auto shard3 = new Shard(mbuffer3);
-
- Shard* shards[3] = {shard1, shard2, shard3};
- auto shard4 = new Shard(shards, 3);
-
- ck_assert_int_eq(shard4->get_record_count(), n * 3);
- ck_assert_int_eq(shard4->get_tombstone_count(), 0);
-
- size_t total_cnt = 0;
- size_t shard1_idx = 0;
- size_t shard2_idx = 0;
- size_t shard3_idx = 0;
-
- for (size_t i = 0; i < shard4->get_record_count(); ++i) {
- auto rec1 = shard1->get_record_at(shard1_idx);
- auto rec2 = shard2->get_record_at(shard2_idx);
- auto rec3 = shard3->get_record_at(shard3_idx);
-
- auto cur_rec = shard4->get_record_at(i);
-
- if (shard1_idx < n && cur_rec->rec == rec1->rec) {
- ++shard1_idx;
- } else if (shard2_idx < n && cur_rec->rec == rec2->rec) {
- ++shard2_idx;
- } else if (shard3_idx < n && cur_rec->rec == rec3->rec) {
- ++shard3_idx;
- } else {
- assert(false);
- }
- }
-
- delete mbuffer1;
- delete mbuffer2;
- delete mbuffer3;
-
- delete shard1;
- delete shard2;
- delete shard3;
- delete shard4;
-}
-
-
-START_TEST(t_point_lookup)
-{
- size_t n = 10000;
-
- auto buffer = create_double_seq_mbuffer<WRec>(n, false);
- auto alias = Shard(buffer);
-
- for (size_t i=0; i<n; i++) {
- WRec r;
- auto rec = (buffer->get_data() + i);
- r.key = rec->rec.key;
- r.value = rec->rec.value;
-
- auto result = alias.point_lookup(r);
- ck_assert_ptr_nonnull(result);
- ck_assert_int_eq(result->rec.key, r.key);
- ck_assert_int_eq(result->rec.value, r.value);
- }
-
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_point_lookup_miss)
-{
- size_t n = 10000;
-
- auto buffer = create_double_seq_mbuffer<WRec>(n, false);
- auto alias = Shard(buffer);
-
- for (size_t i=n + 100; i<2*n; i++) {
- WRec r;
- r.key = i;
- r.value = i;
-
- auto result = alias.point_lookup(r);
- ck_assert_ptr_null(result);
- }
-
- delete buffer;
-}
-
-START_TEST(t_full_cancelation)
-{
- size_t n = 100;
- auto buffer = create_double_seq_mbuffer<WRec>(n, false);
- auto buffer_ts = create_double_seq_mbuffer<WRec>(n, true);
-
- Shard* shard = new Shard(buffer);
- Shard* shard_ts = new Shard(buffer_ts);
-
- ck_assert_int_eq(shard->get_record_count(), n);
- ck_assert_int_eq(shard->get_tombstone_count(), 0);
- ck_assert_int_eq(shard_ts->get_record_count(), n);
- ck_assert_int_eq(shard_ts->get_tombstone_count(), n);
-
- Shard* shards[] = {shard, shard_ts};
-
- Shard* merged = new Shard(shards, 2);
-
- ck_assert_int_eq(merged->get_tombstone_count(), 0);
- ck_assert_int_eq(merged->get_record_count(), 0);
-
- delete buffer;
- delete buffer_ts;
- delete shard;
- delete shard_ts;
- delete merged;
-}
-END_TEST
-
-
-START_TEST(t_alias_query)
-{
- size_t n=1000;
- auto buffer = create_weighted_mbuffer<WRec>(n);
-
- Shard* shard = new Shard(buffer);
-
- size_t k = 1000;
-
- size_t cnt[3] = {0};
- wss::Parms<WRec> parms = {k};
- parms.rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- size_t total_samples = 0;
-
- for (size_t i=0; i<1000; i++) {
- auto state = wss::Query<Shard, WRec>::get_query_state(shard, &parms);
- ((wss::State<WRec> *) state)->sample_size = k;
- auto result = wss::Query<Shard, WRec>::query(shard, state, &parms);
-
- total_samples += result.size();
+typedef WRec R;
+typedef Alias<R> Shard;
- for (size_t j=0; j<result.size(); j++) {
- cnt[result[j].rec.key - 1]++;
- }
-
- wss::Query<Shard, WRec>::delete_query_state(state);
- }
-
- ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05));
-
- gsl_rng_free(parms.rng);
- delete shard;
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_alias_query_merge)
-{
- size_t n=1000;
- auto buffer = create_weighted_mbuffer<WRec>(n);
-
- Shard* shard = new Shard(buffer);
-
- uint64_t lower_key = 0;
- uint64_t upper_key = 5;
-
- size_t k = 1000;
-
- size_t cnt[3] = {0};
- wss::Parms<WRec> parms = {k};
- parms.rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- std::vector<std::vector<Wrapped<WRec>>> results(2);
-
- for (size_t i=0; i<1000; i++) {
- auto state1 = wss::Query<Shard, WRec>::get_query_state(shard, &parms);
- ((wss::State<WRec> *) state1)->sample_size = k;
- results[0] = wss::Query<Shard, WRec>::query(shard, state1, &parms);
-
- auto state2 = wss::Query<Shard, WRec>::get_query_state(shard, &parms);
- ((wss::State<WRec> *) state2)->sample_size = k;
- results[1] = wss::Query<Shard, WRec>::query(shard, state2, &parms);
-
- wss::Query<Shard, WRec>::delete_query_state(state1);
- wss::Query<Shard, WRec>::delete_query_state(state2);
- }
-
- auto merged = wss::Query<Shard, WRec>::merge(results, nullptr);
-
- ck_assert_int_eq(merged.size(), 2*k);
- for (size_t i=0; i<merged.size(); i++) {
- ck_assert_int_ge(merged[i].key, lower_key);
- ck_assert_int_le(merged[i].key, upper_key);
- }
-
- gsl_rng_free(parms.rng);
- delete shard;
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_alias_buffer_query_scan)
-{
- size_t n=1000;
- auto buffer = create_weighted_mbuffer<WRec>(n);
-
- uint64_t lower_key = 0;
- uint64_t upper_key = 5;
-
- size_t k = 1000;
-
- size_t cnt[3] = {0};
- wss::Parms<WRec> parms = {k};
- parms.rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- size_t total_samples = 0;
-
- for (size_t i=0; i<1000; i++) {
- auto state = wss::Query<Shard, WRec, false>::get_buffer_query_state(buffer, &parms);
- ((wss::BufferState<WRec> *) state)->sample_size = k;
- auto result = wss::Query<Shard, WRec, false>::buffer_query(buffer, state, &parms);
- total_samples += result.size();
-
- for (size_t j=0; j<result.size(); j++) {
- cnt[result[j].rec.key - 1]++;
- }
-
- wss::Query<Shard, WRec, false>::delete_buffer_query_state(state);
- }
-
- ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05));
-
- gsl_rng_free(parms.rng);
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_alias_buffer_query_rejection)
-{
- size_t n=1000;
- auto buffer = create_weighted_mbuffer<WRec>(n);
-
- uint64_t lower_key = 0;
- uint64_t upper_key = 5;
-
- size_t k = 1000;
-
- size_t cnt[3] = {0};
- wss::Parms<WRec> parms = {k};
- parms.rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- size_t total_samples = 0;
-
- for (size_t i=0; i<1000; i++) {
- auto state = wss::Query<Shard, WRec>::get_buffer_query_state(buffer, &parms);
- ((wss::BufferState<WRec> *) state)->sample_size = k;
- auto result = wss::Query<Shard, WRec>::buffer_query(buffer, state, &parms);
-
- total_samples += result.size();
-
- for (size_t j=0; j<result.size(); j++) {
- cnt[result[j].rec.key - 1]++;
- }
-
- wss::Query<Shard, WRec>::delete_buffer_query_state(state);
- }
-
- ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .1));
- ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .1));
- ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .1));
-
- gsl_rng_free(parms.rng);
- delete buffer;
-}
-END_TEST
+#include "include/shard_standard.h"
+#include "include/rangequery.h"
Suite *unit_testing()
{
- Suite *unit = suite_create("Alias Shard Unit Testing");
-
- TCase *create = tcase_create("de::Alias constructor Testing");
- tcase_add_test(create, t_mbuffer_init);
- tcase_add_test(create, t_alias_init);
- tcase_set_timeout(create, 100);
- suite_add_tcase(unit, create);
-
+ Suite *unit = suite_create("ISAMTree Shard Unit Testing");
- TCase *tombstone = tcase_create("de:Alias::tombstone cancellation Testing");
- tcase_add_test(tombstone, t_full_cancelation);
- suite_add_tcase(unit, tombstone);
-
-
- TCase *lookup = tcase_create("de:Alias:point_lookup Testing");
- tcase_add_test(lookup, t_point_lookup);
- tcase_add_test(lookup, t_point_lookup_miss);
- suite_add_tcase(unit, lookup);
-
-
-
- TCase *sampling = tcase_create("de:Alias::AliasQuery Testing");
- tcase_add_test(sampling, t_alias_query);
- tcase_add_test(sampling, t_alias_query_merge);
- tcase_add_test(sampling, t_alias_buffer_query_rejection);
- tcase_add_test(sampling, t_alias_buffer_query_scan);
- suite_add_tcase(unit, sampling);
+ inject_rangequery_tests(unit);
+ inject_shard_tests(unit);
return unit;
}
@@ -389,3 +58,4 @@ int main()
return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
+
diff --git a/tests/augbtree_tests.cpp b/tests/augbtree_tests.cpp
index 1b24344..c7a0885 100644
--- a/tests/augbtree_tests.cpp
+++ b/tests/augbtree_tests.cpp
@@ -1,7 +1,7 @@
/*
- * tests/augbtree_tests.cpp
+ * tests/isam_tests.cpp
*
- * Unit tests for AugBTree (Augmented B+Tree) shard
+ * Unit tests for ISAM Tree shard
*
* Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
@@ -11,363 +11,23 @@
*/
#include "shard/AugBTree.h"
-#include "query/wirs.h"
-#include "testing.h"
-
+#include "include/testing.h"
#include <check.h>
using namespace de;
-typedef AugBTree<WRec> Shard;
-
-START_TEST(t_mbuffer_init)
-{
- auto buffer = new MutableBuffer<WRec>(1024, 1024);
- for (uint64_t i = 512; i > 0; i--) {
- uint32_t v = i;
- buffer->append({i,v, 1});
- }
-
- for (uint64_t i = 1; i <= 256; ++i) {
- uint32_t v = i;
- buffer->append({i, v, 1}, true);
- }
-
- for (uint64_t i = 257; i <= 512; ++i) {
- uint32_t v = i + 1;
- buffer->append({i, v, 1});
- }
-
- Shard* shard = new Shard(buffer);
- ck_assert_uint_eq(shard->get_record_count(), 512);
-
- delete buffer;
- delete shard;
-}
-
-
-START_TEST(t_wirs_init)
-{
- size_t n = 512;
- auto mbuffer1 = create_test_mbuffer<WRec>(n);
- auto mbuffer2 = create_test_mbuffer<WRec>(n);
- auto mbuffer3 = create_test_mbuffer<WRec>(n);
-
- auto shard1 = new Shard(mbuffer1);
- auto shard2 = new Shard(mbuffer2);
- auto shard3 = new Shard(mbuffer3);
-
- Shard* shards[3] = {shard1, shard2, shard3};
- auto shard4 = new Shard(shards, 3);
-
- ck_assert_int_eq(shard4->get_record_count(), n * 3);
- ck_assert_int_eq(shard4->get_tombstone_count(), 0);
-
- size_t total_cnt = 0;
- size_t shard1_idx = 0;
- size_t shard2_idx = 0;
- size_t shard3_idx = 0;
-
- for (size_t i = 0; i < shard4->get_record_count(); ++i) {
- auto rec1 = shard1->get_record_at(shard1_idx);
- auto rec2 = shard2->get_record_at(shard2_idx);
- auto rec3 = shard3->get_record_at(shard3_idx);
-
- auto cur_rec = shard4->get_record_at(i);
-
- if (shard1_idx < n && cur_rec->rec == rec1->rec) {
- ++shard1_idx;
- } else if (shard2_idx < n && cur_rec->rec == rec2->rec) {
- ++shard2_idx;
- } else if (shard3_idx < n && cur_rec->rec == rec3->rec) {
- ++shard3_idx;
- } else {
- assert(false);
- }
- }
-
- delete mbuffer1;
- delete mbuffer2;
- delete mbuffer3;
-
- delete shard1;
- delete shard2;
- delete shard3;
- delete shard4;
-}
-
-
-START_TEST(t_point_lookup)
-{
- size_t n = 10000;
-
- auto buffer = create_double_seq_mbuffer<WRec>(n, false);
- auto wirs = Shard(buffer);
-
- for (size_t i=0; i<n; i++) {
- WRec r;
- auto rec = (buffer->get_data() + i);
- r.key = rec->rec.key;
- r.value = rec->rec.value;
-
- auto result = wirs.point_lookup(r);
- ck_assert_ptr_nonnull(result);
- ck_assert_int_eq(result->rec.key, r.key);
- ck_assert_int_eq(result->rec.value, r.value);
- }
-
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_point_lookup_miss)
-{
- size_t n = 10000;
-
- auto buffer = create_double_seq_mbuffer<WRec>(n, false);
- auto wirs = Shard(buffer);
-
- for (size_t i=n + 100; i<2*n; i++) {
- WRec r;
- r.key = i;
- r.value = i;
-
- auto result = wirs.point_lookup(r);
- ck_assert_ptr_null(result);
- }
-
- delete buffer;
-}
-
-
-START_TEST(t_full_cancelation)
-{
- size_t n = 100;
- auto buffer = create_double_seq_mbuffer<WRec>(n, false);
- auto buffer_ts = create_double_seq_mbuffer<WRec>(n, true);
-
- Shard* shard = new Shard(buffer);
- Shard* shard_ts = new Shard(buffer_ts);
-
- ck_assert_int_eq(shard->get_record_count(), n);
- ck_assert_int_eq(shard->get_tombstone_count(), 0);
- ck_assert_int_eq(shard_ts->get_record_count(), n);
- ck_assert_int_eq(shard_ts->get_tombstone_count(), n);
-
- Shard* shards[] = {shard, shard_ts};
-
- Shard* merged = new Shard(shards, 2);
-
- ck_assert_int_eq(merged->get_tombstone_count(), 0);
- ck_assert_int_eq(merged->get_record_count(), 0);
-
- delete buffer;
- delete buffer_ts;
- delete shard;
- delete shard_ts;
- delete merged;
-}
-END_TEST
-
-
-START_TEST(t_wirs_query)
-{
- size_t n=1000;
- auto buffer = create_weighted_mbuffer<WRec>(n);
-
- Shard* shard = new Shard(buffer);
-
- uint64_t lower_key = 0;
- uint64_t upper_key = 5;
-
- size_t k = 1000;
-
- size_t cnt[3] = {0};
- wirs::Parms<WRec> parms = {lower_key, upper_key, k};
- parms.rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- size_t total_samples = 0;
-
- for (size_t i=0; i<1000; i++) {
- auto state = wirs::Query<Shard, WRec>::get_query_state(shard, &parms);
- ((wirs::State<WRec> *) state)->sample_size = k;
- auto result = wirs::Query<Shard, WRec>::query(shard, state, &parms);
-
- total_samples += result.size();
-
- for (size_t j=0; j<result.size(); j++) {
- cnt[result[j].rec.key - 1]++;
- }
-
- wirs::Query<Shard, WRec>::delete_query_state(state);
- }
-
- ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05));
-
- gsl_rng_free(parms.rng);
- delete shard;
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_wirs_query_merge)
-{
- size_t n=1000;
- auto buffer = create_weighted_mbuffer<WRec>(n);
-
- Shard* shard = new Shard(buffer);
-
- uint64_t lower_key = 0;
- uint64_t upper_key = 5;
-
- size_t k = 1000;
-
- size_t cnt[3] = {0};
- wirs::Parms<WRec> parms = {lower_key, upper_key, k};
- parms.rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- std::vector<std::vector<Wrapped<WRec>>> results(2);
-
- for (size_t i=0; i<1000; i++) {
- auto state1 = wirs::Query<Shard, WRec>::get_query_state(shard, &parms);
- ((wirs::State<WRec> *) state1)->sample_size = k;
- results[0] = wirs::Query<Shard, WRec>::query(shard, state1, &parms);
-
- auto state2 = wirs::Query<Shard, WRec>::get_query_state(shard, &parms);
- ((wirs::State<WRec> *) state2)->sample_size = k;
- results[1] = wirs::Query<Shard, WRec>::query(shard, state2, &parms);
-
- wirs::Query<Shard, WRec>::delete_query_state(state1);
- wirs::Query<Shard, WRec>::delete_query_state(state2);
- }
-
- auto merged = wirs::Query<Shard, WRec>::merge(results, nullptr);
-
- ck_assert_int_eq(merged.size(), 2*k);
- for (size_t i=0; i<merged.size(); i++) {
- ck_assert_int_ge(merged[i].key, lower_key);
- ck_assert_int_le(merged[i].key, upper_key);
- }
-
- gsl_rng_free(parms.rng);
- delete shard;
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_wirs_buffer_query_scan)
-{
- size_t n=1000;
- auto buffer = create_weighted_mbuffer<WRec>(n);
-
- uint64_t lower_key = 0;
- uint64_t upper_key = 5;
-
- size_t k = 1000;
-
- size_t cnt[3] = {0};
- wirs::Parms<WRec> parms = {lower_key, upper_key, k};
- parms.rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- size_t total_samples = 0;
-
- for (size_t i=0; i<1000; i++) {
- auto state = wirs::Query<Shard, WRec, false>::get_buffer_query_state(buffer, &parms);
- ((wirs::BufferState<WRec> *) state)->sample_size = k;
- auto result = wirs::Query<Shard, WRec, false>::buffer_query(buffer, state, &parms);
-
- total_samples += result.size();
-
- for (size_t j=0; j<result.size(); j++) {
- cnt[result[j].rec.key - 1]++;
- }
-
- wirs::Query<Shard, WRec, false>::delete_buffer_query_state(state);
- }
-
- ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05));
-
- gsl_rng_free(parms.rng);
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_wirs_buffer_query_rejection)
-{
- size_t n=1000;
- auto buffer = create_weighted_mbuffer<WRec>(n);
-
- uint64_t lower_key = 0;
- uint64_t upper_key = 5;
-
- size_t k = 1000;
-
- size_t cnt[3] = {0};
- wirs::Parms<WRec> parms = {lower_key, upper_key, k};
- parms.rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- size_t total_samples = 0;
-
- for (size_t i=0; i<1000; i++) {
- auto state = wirs::Query<Shard, WRec>::get_buffer_query_state(buffer, &parms);
- ((wirs::BufferState<WRec> *) state)->sample_size = k;
- auto result = wirs::Query<Shard, WRec>::buffer_query(buffer, state, &parms);
-
- total_samples += result.size();
-
- for (size_t j=0; j<result.size(); j++) {
- cnt[result[j].rec.key - 1]++;
- }
-
- wirs::Query<Shard, WRec>::delete_buffer_query_state(state);
- }
-
- ck_assert(roughly_equal(cnt[0], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[1], (double) total_samples/4.0, total_samples, .05));
- ck_assert(roughly_equal(cnt[2], (double) total_samples/2.0, total_samples, .05));
-
- gsl_rng_free(parms.rng);
- delete buffer;
-}
-END_TEST
+typedef WRec R;
+typedef AugBTree<R> Shard;
+#include "include/shard_standard.h"
+#include "include/rangequery.h"
Suite *unit_testing()
{
- Suite *unit = suite_create("AugBTree Shard Unit Testing");
-
- TCase *create = tcase_create("de::AugBTree constructor Testing");
- tcase_add_test(create, t_mbuffer_init);
- tcase_add_test(create, t_wirs_init);
- tcase_set_timeout(create, 100);
- suite_add_tcase(unit, create);
-
-
- TCase *tombstone = tcase_create("de:AugBTree::tombstone cancellation Testing");
- tcase_add_test(tombstone, t_full_cancelation);
- suite_add_tcase(unit, tombstone);
-
-
- TCase *lookup = tcase_create("de:AugBTree:point_lookup Testing");
- tcase_add_test(lookup, t_point_lookup);
- tcase_add_test(lookup, t_point_lookup_miss);
- suite_add_tcase(unit, lookup);
-
+ Suite *unit = suite_create("Alias-augmented B+Tree Shard Unit Testing");
- TCase *sampling = tcase_create("de:AugBTree::AugBTreeQuery Testing");
- tcase_add_test(sampling, t_wirs_query);
- tcase_add_test(sampling, t_wirs_query_merge);
- tcase_add_test(sampling, t_wirs_buffer_query_rejection);
- tcase_add_test(sampling, t_wirs_buffer_query_scan);
- suite_add_tcase(unit, sampling);
+ inject_rangequery_tests(unit);
+ inject_shard_tests(unit);
return unit;
}
diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp
index 40605c4..2039efb 100644
--- a/tests/de_level_concurrent.cpp
+++ b/tests/de_level_concurrent.cpp
@@ -17,12 +17,12 @@
#include "framework/DynamicExtension.h"
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
-#include "shard/TrieSpline.h"
#include <check.h>
using namespace de;
-typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+typedef Rec R;
+typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
#include "include/concurrent_extension.h"
diff --git a/tests/de_level_tag.cpp b/tests/de_level_tag.cpp
index 2ff2d26..75131c4 100644
--- a/tests/de_level_tag.cpp
+++ b/tests/de_level_tag.cpp
@@ -21,7 +21,8 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::LEVELING, DeletePolicy::TAGGING, SerialScheduler> DE;
+typedef Rec R;
+typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::LEVELING, DeletePolicy::TAGGING, SerialScheduler> DE;
#include "include/dynamic_extension.h"
diff --git a/tests/de_level_tomb.cpp b/tests/de_level_tomb.cpp
index 9b30ac0..6da211d 100644
--- a/tests/de_level_tomb.cpp
+++ b/tests/de_level_tomb.cpp
@@ -22,7 +22,8 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+typedef Rec R;
+typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
#include "include/dynamic_extension.h"
diff --git a/tests/de_tier_concurrent.cpp b/tests/de_tier_concurrent.cpp
index 418332b..722b9bd 100644
--- a/tests/de_tier_concurrent.cpp
+++ b/tests/de_tier_concurrent.cpp
@@ -21,7 +21,8 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+typedef Rec R;
+typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
#include "include/concurrent_extension.h"
diff --git a/tests/de_tier_tag.cpp b/tests/de_tier_tag.cpp
index 83c37af..79bb7bf 100644
--- a/tests/de_tier_tag.cpp
+++ b/tests/de_tier_tag.cpp
@@ -22,7 +22,8 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE;
+typedef Rec R;
+typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE;
#include "include/dynamic_extension.h"
diff --git a/tests/de_tier_tomb.cpp b/tests/de_tier_tomb.cpp
index 58a7a0f..b1387bb 100644
--- a/tests/de_tier_tomb.cpp
+++ b/tests/de_tier_tomb.cpp
@@ -22,7 +22,8 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<Rec, ISAMTree<Rec>>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+typedef Rec R;
+typedef DynamicExtension<Rec, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
#include "include/dynamic_extension.h"
diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h
index 24cb2ce..0993fac 100644
--- a/tests/include/concurrent_extension.h
+++ b/tests/include/concurrent_extension.h
@@ -8,8 +8,8 @@
* Distributed under the Modified BSD License.
*
* WARNING: This file must be included in the main unit test set
- * after the definition of an appropriate Shard, Query, and Rec
- * type. In particular, Rec needs to implement the key-value
+ * after the definition of an appropriate Shard, Query, and R
+ * type. In particular, R needs to implement the key-value
* pair interface. For other types of record, you'll need to
* use a different set of unit tests.
*/
@@ -30,7 +30,7 @@
#include <check.h>
//using namespace de;
-//typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<ISAMTree<Rec>, Rec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+//typedef DynamicExtension<R, ISAMTree<R>, rq::Query<ISAMTree<R>, R>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
*/
@@ -54,7 +54,7 @@ START_TEST(t_insert)
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<100; i++) {
- Rec r = {key, val};
+ R r = {key, val};
ck_assert_int_eq(test_de->insert(r), 1);
key++;
val++;
@@ -75,7 +75,7 @@ START_TEST(t_debug_insert)
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<1000; i++) {
- Rec r = {key, val};
+ R r = {key, val};
ck_assert_int_eq(test_de->insert(r), 1);
ck_assert_int_eq(test_de->get_record_count(), i+1);
key++;
@@ -94,7 +94,7 @@ START_TEST(t_insert_with_mem_merges)
uint64_t key = 0;
uint32_t val = 0;
- Rec r = {key, val};
+ R r = {key, val};
for (size_t i=0; i<1000; i++) {
ck_assert_int_eq(test_de->insert(r), 1);
r.key++;
@@ -148,7 +148,7 @@ START_TEST(t_range_query)
size_t i=0;
while ( i < keys.size()) {
- Rec r = {keys[i], (uint32_t) i};
+ R r = {keys[i], (uint32_t) i};
if (test_de->insert(r)) {
i++;
} else {
@@ -166,7 +166,7 @@ START_TEST(t_range_query)
uint64_t lower_key = keys[idx];
uint64_t upper_key = keys[idx + 250];
- rq::Parms<Rec> p;
+ rq::Parms<R> p;
p.lower_bound = lower_key;
p.upper_bound = upper_key;
@@ -210,7 +210,7 @@ START_TEST(t_tombstone_merging_01)
size_t deletes = 0;
size_t cnt=0;
for (auto rec : records) {
- Rec r = {rec.first, rec.second};
+ R r = {rec.first, rec.second};
while (!test_de->insert(r)) {
_mm_pause();
}
@@ -220,7 +220,7 @@ START_TEST(t_tombstone_merging_01)
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
- Rec dr = {del_vec[i].first, del_vec[i].second};
+ R dr = {del_vec[i].first, del_vec[i].second};
while (!test_de->erase(dr)) {
_mm_pause();
}
@@ -249,9 +249,9 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
auto test_de = new DE(1000, 10000, 2);
- std::set<Rec> records;
- std::set<Rec> to_delete;
- std::set<Rec> deleted;
+ std::set<R> records;
+ std::set<R> to_delete;
+ std::set<R> deleted;
while (records.size() < reccnt) {
uint64_t key = rand();
@@ -267,7 +267,7 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
ck_assert_int_eq(test_de->insert(rec), 1);
if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
- std::vector<Rec> del_vec;
+ std::vector<R> del_vec;
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
@@ -295,9 +295,9 @@ START_TEST(t_static_structure)
size_t reccnt = 100000;
auto test_de = new DE(100, 1000, 2);
- std::set<Rec> records;
- std::set<Rec> to_delete;
- std::set<Rec> deleted;
+ std::set<R> records;
+ std::set<R> to_delete;
+ std::set<R> deleted;
while (records.size() < reccnt) {
uint64_t key = rand();
@@ -319,7 +319,7 @@ START_TEST(t_static_structure)
t_reccnt++;
if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
- std::vector<Rec> del_vec;
+ std::vector<R> del_vec;
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
@@ -339,11 +339,11 @@ START_TEST(t_static_structure)
}
- //fprintf(stderr, "Tombstones: %ld\tRecords: %ld\n", test_de->get_tombstone_count(), test_de->get_record_count());
+ //fprintf(stderr, "Tombstones: %ld\tRords: %ld\n", test_de->get_tombstone_count(), test_de->get_record_count());
//fprintf(stderr, "Inserts: %ld\tDeletes:%ld\tNet:%ld\n", reccnt, deletes, reccnt - deletes);
auto flat = test_de->create_static_structure(true);
- //fprintf(stderr, "Flat: Tombstones: %ld\tRecords %ld\n", flat->get_tombstone_count(), flat->get_record_count());
+ //fprintf(stderr, "Flat: Tombstones: %ld\tRords %ld\n", flat->get_tombstone_count(), flat->get_record_count());
//ck_assert_int_eq(flat->get_record_count(), reccnt - deletes);
uint64_t prev_key = 0;
diff --git a/tests/include/dynamic_extension.h b/tests/include/dynamic_extension.h
index 5a08f5a..f0f13dd 100644
--- a/tests/include/dynamic_extension.h
+++ b/tests/include/dynamic_extension.h
@@ -8,8 +8,8 @@
* Distributed under the Modified BSD License.
*
* WARNING: This file must be included in the main unit test set
- * after the definition of an appropriate Shard, Query, and Rec
- * type. In particular, Rec needs to implement the key-value
+ * after the definition of an appropriate Shard, Query, and R
+ * type. In particular, R needs to implement the key-value
* pair interface. For other types of record, you'll need to
* use a different set of unit tests.
*/
@@ -29,7 +29,7 @@
//#include "query/rangequery.h"
//#include <check.h>
//using namespace de;
-//typedef DynamicExtension<Rec, ISAMTree<Rec>, rq::Query<ISAMTree<Rec>, Rec>, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE;
+//typedef DynamicExtension<R, ISAMTree<R>, rq::Query<ISAMTree<R>, R>, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE;
START_TEST(t_create)
@@ -52,7 +52,7 @@ START_TEST(t_insert)
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<100; i++) {
- Rec r = {key, val};
+ R r = {key, val};
ck_assert_int_eq(test_de->insert(r), 1);
key++;
val++;
@@ -73,7 +73,7 @@ START_TEST(t_debug_insert)
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<1000; i++) {
- Rec r = {key, val};
+ R r = {key, val};
ck_assert_int_eq(test_de->insert(r), 1);
ck_assert_int_eq(test_de->get_record_count(), i+1);
key++;
@@ -92,7 +92,7 @@ START_TEST(t_insert_with_mem_merges)
uint64_t key = 0;
uint32_t val = 0;
for (size_t i=0; i<300; i++) {
- Rec r = {key, val};
+ R r = {key, val};
ck_assert_int_eq(test_de->insert(r), 1);
key++;
val++;
@@ -123,7 +123,7 @@ START_TEST(t_range_query)
std::shuffle(keys.begin(), keys.end(), gen);
for (size_t i=0; i<keys.size(); i++) {
- Rec r = {keys[i], (uint32_t) i};
+ R r = {keys[i], (uint32_t) i};
ck_assert_int_eq(test_de->insert(r), 1);
}
@@ -136,7 +136,7 @@ START_TEST(t_range_query)
uint64_t lower_key = keys[idx];
uint64_t upper_key = keys[idx + 250];
- rq::Parms<Rec> p;
+ rq::Parms<R> p;
p.lower_bound = lower_key;
p.upper_bound = upper_key;
@@ -177,7 +177,7 @@ START_TEST(t_tombstone_merging_01)
size_t deletes = 0;
size_t cnt=0;
for (auto rec : records) {
- Rec r = {rec.first, rec.second};
+ R r = {rec.first, rec.second};
ck_assert_int_eq(test_de->insert(r), 1);
if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
@@ -185,7 +185,7 @@ START_TEST(t_tombstone_merging_01)
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
- Rec dr = {del_vec[i].first, del_vec[i].second};
+ R dr = {del_vec[i].first, del_vec[i].second};
test_de->erase(dr);
deletes++;
to_delete.erase(del_vec[i]);
@@ -212,9 +212,9 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
auto test_de = new DE(1000, 10000, 2);
- std::set<Rec> records;
- std::set<Rec> to_delete;
- std::set<Rec> deleted;
+ std::set<R> records;
+ std::set<R> to_delete;
+ std::set<R> deleted;
while (records.size() < reccnt) {
uint64_t key = rand();
@@ -230,7 +230,7 @@ DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
ck_assert_int_eq(test_de->insert(rec), 1);
if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
- std::vector<Rec> del_vec;
+ std::vector<R> del_vec;
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
@@ -258,9 +258,9 @@ START_TEST(t_static_structure)
size_t reccnt = 100000;
auto test_de = new DE(100, 1000, 2);
- std::set<Rec> records;
- std::set<Rec> to_delete;
- std::set<Rec> deleted;
+ std::set<R> records;
+ std::set<R> to_delete;
+ std::set<R> deleted;
while (records.size() < reccnt) {
uint64_t key = rand();
@@ -280,7 +280,7 @@ START_TEST(t_static_structure)
t_reccnt++;
if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
- std::vector<Rec> del_vec;
+ std::vector<R> del_vec;
std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
for (size_t i=0; i<del_vec.size(); i++) {
diff --git a/tests/include/rangecount.h b/tests/include/rangecount.h
index 471af27..fdd66d9 100644
--- a/tests/include/rangecount.h
+++ b/tests/include/rangecount.h
@@ -9,8 +9,8 @@
* Distributed under the Modified BSD License.
*
* WARNING: This file must be included in the main unit test set
- * after the definition of an appropriate Shard and Rec
- * type. In particular, Rec needs to implement the key-value
+ * after the definition of an appropriate Shard and R
+ * type. In particular, R needs to implement the key-value
* pair interface and Shard needs to support lower_bound.
* For other types of record and shard, you'll need to
* use a different set of unit tests.
@@ -29,21 +29,24 @@
//#include "testing.h"
//#include <check.h>
//using namespace de;
-//typedef ISAMTree<Rec> Shard;
+//typedef ISAMTree<R> Shard;
+
+
+#include "query/rangecount.h"
START_TEST(t_range_count)
{
- auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+ auto buffer = create_sequential_mbuffer<R>(100, 1000);
auto shard = Shard(buffer->get_buffer_view());
- rc::Parms<Rec> parms;
+ rc::Parms<R> parms;
parms.lower_bound = 300;
parms.upper_bound = 500;
- auto state = rc::Query<Rec, Shard>::get_query_state(&shard, &parms);
- auto result = rc::Query<Rec, Shard>::query(&shard, state, &parms);
- rc::Query<Rec, Shard>::delete_query_state(state);
+ auto state = rc::Query<R, Shard>::get_query_state(&shard, &parms);
+ auto result = rc::Query<R, Shard>::query(&shard, state, &parms);
+ rc::Query<R, Shard>::delete_query_state(state);
ck_assert_int_eq(result.size(), 1);
ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1);
@@ -55,17 +58,17 @@ END_TEST
START_TEST(t_buffer_range_count)
{
- auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+ auto buffer = create_sequential_mbuffer<R>(100, 1000);
- rc::Parms<Rec> parms;
+ rc::Parms<R> parms;
parms.lower_bound = 300;
parms.upper_bound = 500;
{
auto view = buffer->get_buffer_view();
- auto state = rc::Query<Rec, Shard>::get_buffer_query_state(&view, &parms);
- auto result = rc::Query<Rec, Shard>::buffer_query(state, &parms);
- rc::Query<Rec, Shard>::delete_buffer_query_state(state);
+ auto state = rc::Query<R, Shard>::get_buffer_query_state(&view, &parms);
+ auto result = rc::Query<R, Shard>::buffer_query(state, &parms);
+ rc::Query<R, Shard>::delete_buffer_query_state(state);
ck_assert_int_eq(result.size(), 1);
ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1);
@@ -78,32 +81,32 @@ END_TEST
START_TEST(t_range_count_merge)
{
- auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
- auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
+ auto buffer1 = create_sequential_mbuffer<R>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<R>(400, 1000);
auto shard1 = Shard(buffer1->get_buffer_view());
auto shard2 = Shard(buffer2->get_buffer_view());
- rc::Parms<Rec> parms;
+ rc::Parms<R> parms;
parms.lower_bound = 150;
parms.upper_bound = 500;
size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200;
- auto state1 = rc::Query<Rec, Shard>::get_query_state(&shard1, &parms);
- auto state2 = rc::Query<Rec, Shard>::get_query_state(&shard2, &parms);
+ auto state1 = rc::Query<R, Shard>::get_query_state(&shard1, &parms);
+ auto state2 = rc::Query<R, Shard>::get_query_state(&shard2, &parms);
- std::vector<std::vector<de::Wrapped<Rec>>> results(2);
- results[0] = rc::Query<Rec, Shard>::query(&shard1, state1, &parms);
- results[1] = rc::Query<Rec, Shard>::query(&shard2, state2, &parms);
+ std::vector<std::vector<de::Wrapped<R>>> results(2);
+ results[0] = rc::Query<R, Shard>::query(&shard1, state1, &parms);
+ results[1] = rc::Query<R, Shard>::query(&shard2, state2, &parms);
- rc::Query<Rec, Shard>::delete_query_state(state1);
- rc::Query<Rec, Shard>::delete_query_state(state2);
+ rc::Query<R, Shard>::delete_query_state(state1);
+ rc::Query<R, Shard>::delete_query_state(state2);
ck_assert_int_eq(results[0].size(), 1);
ck_assert_int_eq(results[1].size(), 1);
- auto result = rc::Query<Rec, Shard>::merge(results, nullptr);
+ auto result = rc::Query<R, Shard>::merge(results, nullptr);
ck_assert_int_eq(result[0].key, result_size);
@@ -115,8 +118,8 @@ END_TEST
START_TEST(t_lower_bound)
{
- auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
- auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
+ auto buffer1 = create_sequential_mbuffer<R>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<R>(400, 1000);
auto shard1 = new Shard(buffer1->get_buffer_view());
auto shard2 = new Shard(buffer2->get_buffer_view());
@@ -126,7 +129,7 @@ START_TEST(t_lower_bound)
auto merged = Shard(shards);
for (size_t i=100; i<1000; i++) {
- Rec r;
+ R r;
r.key = i;
r.value = i;
diff --git a/tests/include/rangequery.h b/tests/include/rangequery.h
index dbb71db..a8a73f7 100644
--- a/tests/include/rangequery.h
+++ b/tests/include/rangequery.h
@@ -9,8 +9,8 @@
* Distributed under the Modified BSD License.
*
* WARNING: This file must be included in the main unit test set
- * after the definition of an appropriate Shard and Rec
- * type. In particular, Rec needs to implement the key-value
+ * after the definition of an appropriate Shard and R
+ * type. In particular, R needs to implement the key-value
* pair interface and Shard needs to support lower_bound.
* For other types of record and shard, you'll need to
* use a different set of unit tests.
@@ -29,21 +29,23 @@
//#include "testing.h"
//#include <check.h>
//using namespace de;
-//typedef ISAMTree<Rec> Shard;
+//typedef ISAMTree<R> Shard;
+
+#include "query/rangequery.h"
START_TEST(t_range_query)
{
- auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+ auto buffer = create_sequential_mbuffer<R>(100, 1000);
auto shard = Shard(buffer->get_buffer_view());
- rq::Parms<Rec> parms;
+ rq::Parms<R> parms;
parms.lower_bound = 300;
parms.upper_bound = 500;
- auto state = rq::Query<Rec, Shard>::get_query_state(&shard, &parms);
- auto result = rq::Query<Rec, Shard>::query(&shard, state, &parms);
- rq::Query<Rec, Shard>::delete_query_state(state);
+ auto state = rq::Query<R, Shard>::get_query_state(&shard, &parms);
+ auto result = rq::Query<R, Shard>::query(&shard, state, &parms);
+ rq::Query<R, Shard>::delete_query_state(state);
ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
for (size_t i=0; i<result.size(); i++) {
@@ -58,17 +60,17 @@ END_TEST
START_TEST(t_buffer_range_query)
{
- auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+ auto buffer = create_sequential_mbuffer<R>(100, 1000);
- rq::Parms<Rec> parms;
+ rq::Parms<R> parms;
parms.lower_bound = 300;
parms.upper_bound = 500;
{
auto view = buffer->get_buffer_view();
- auto state = rq::Query<Rec, Shard>::get_buffer_query_state(&view, &parms);
- auto result = rq::Query<Rec, Shard>::buffer_query(state, &parms);
- rq::Query<Rec, Shard>::delete_buffer_query_state(state);
+ auto state = rq::Query<R, Shard>::get_buffer_query_state(&view, &parms);
+ auto result = rq::Query<R, Shard>::buffer_query(state, &parms);
+ rq::Query<R, Shard>::delete_buffer_query_state(state);
ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
for (size_t i=0; i<result.size(); i++) {
@@ -84,40 +86,40 @@ END_TEST
START_TEST(t_range_query_merge)
{
- auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
- auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
+ auto buffer1 = create_sequential_mbuffer<R>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<R>(400, 1000);
auto shard1 = Shard(buffer1->get_buffer_view());
auto shard2 = Shard(buffer2->get_buffer_view());
- rq::Parms<Rec> parms;
+ rq::Parms<R> parms;
parms.lower_bound = 150;
parms.upper_bound = 500;
size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200;
- auto state1 = rq::Query<Rec, Shard>::get_query_state(&shard1, &parms);
- auto state2 = rq::Query<Rec, Shard>::get_query_state(&shard2, &parms);
+ auto state1 = rq::Query<R, Shard>::get_query_state(&shard1, &parms);
+ auto state2 = rq::Query<R, Shard>::get_query_state(&shard2, &parms);
- std::vector<std::vector<de::Wrapped<Rec>>> results(2);
- results[0] = rq::Query<Rec, Shard>::query(&shard1, state1, &parms);
- results[1] = rq::Query<Rec, Shard>::query(&shard2, state2, &parms);
+ std::vector<std::vector<de::Wrapped<R>>> results(2);
+ results[0] = rq::Query<R, Shard>::query(&shard1, state1, &parms);
+ results[1] = rq::Query<R, Shard>::query(&shard2, state2, &parms);
- rq::Query<Rec, Shard>::delete_query_state(state1);
- rq::Query<Rec, Shard>::delete_query_state(state2);
+ rq::Query<R, Shard>::delete_query_state(state1);
+ rq::Query<R, Shard>::delete_query_state(state2);
ck_assert_int_eq(results[0].size() + results[1].size(), result_size);
- std::vector<std::vector<Wrapped<Rec>>> proc_results;
+ std::vector<std::vector<Wrapped<R>>> proc_results;
for (size_t j=0; j<results.size(); j++) {
- proc_results.emplace_back(std::vector<Wrapped<Rec>>());
+ proc_results.emplace_back(std::vector<Wrapped<R>>());
for (size_t i=0; i<results[j].size(); i++) {
proc_results[j].emplace_back(results[j][i]);
}
}
- auto result = rq::Query<Rec, Shard>::merge(proc_results, nullptr);
+ auto result = rq::Query<R, Shard>::merge(proc_results, nullptr);
std::sort(result.begin(), result.end());
ck_assert_int_eq(result.size(), result_size);
@@ -137,8 +139,8 @@ END_TEST
START_TEST(t_lower_bound)
{
- auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
- auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
+ auto buffer1 = create_sequential_mbuffer<R>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<R>(400, 1000);
auto shard1 = new Shard(buffer1->get_buffer_view());
auto shard2 = new Shard(buffer2->get_buffer_view());
@@ -148,7 +150,7 @@ START_TEST(t_lower_bound)
auto merged = Shard(shards);
for (size_t i=100; i<1000; i++) {
- Rec r;
+ R r;
r.key = i;
r.value = i;
diff --git a/tests/include/shard_standard.h b/tests/include/shard_standard.h
index ddd7614..f50c1cb 100644
--- a/tests/include/shard_standard.h
+++ b/tests/include/shard_standard.h
@@ -8,8 +8,8 @@
* Distributed under the Modified BSD License.
*
* WARNING: This file must be included in the main unit test set
- * after the definition of an appropriate Shard and Rec
- * type. In particular, Rec needs to implement the key-value
+ * after the definition of an appropriate Shard and R
+ * type. In particular, R needs to implement the key-value
* pair interface. For other types of record, you'll need to
* use a different set of unit tests.
*/
@@ -26,11 +26,11 @@
//#include "testing.h"
//#include <check.h>
//using namespace de;
-//typedef ISAMTree<Rec> Shard;
+//typedef ISAMTree<R> Shard;
START_TEST(t_mbuffer_init)
{
- auto buffer = new MutableBuffer<Rec>(512, 1024);
+ auto buffer = new MutableBuffer<R>(512, 1024);
for (uint64_t i = 512; i > 0; i--) {
uint32_t v = i;
buffer->append({i,v, 1});
@@ -57,9 +57,9 @@ START_TEST(t_mbuffer_init)
START_TEST(t_shard_init)
{
size_t n = 512;
- auto mbuffer1 = create_test_mbuffer<Rec>(n);
- auto mbuffer2 = create_test_mbuffer<Rec>(n);
- auto mbuffer3 = create_test_mbuffer<Rec>(n);
+ auto mbuffer1 = create_test_mbuffer<R>(n);
+ auto mbuffer2 = create_test_mbuffer<R>(n);
+ auto mbuffer3 = create_test_mbuffer<R>(n);
auto shard1 = new Shard(mbuffer1->get_buffer_view());
auto shard2 = new Shard(mbuffer2->get_buffer_view());
@@ -108,8 +108,8 @@ START_TEST(t_shard_init)
START_TEST(t_full_cancelation)
{
size_t n = 100;
- auto buffer = create_double_seq_mbuffer<Rec>(n, false);
- auto buffer_ts = create_double_seq_mbuffer<Rec>(n, true);
+ auto buffer = create_double_seq_mbuffer<R>(n, false);
+ auto buffer_ts = create_double_seq_mbuffer<R>(n, true);
Shard* shard = new Shard(buffer->get_buffer_view());
Shard* shard_ts = new Shard(buffer_ts->get_buffer_view());
@@ -139,14 +139,14 @@ START_TEST(t_point_lookup)
{
size_t n = 10000;
- auto buffer = create_double_seq_mbuffer<Rec>(n, false);
+ auto buffer = create_double_seq_mbuffer<R>(n, false);
auto isam = Shard(buffer->get_buffer_view());
{
auto view = buffer->get_buffer_view();
for (size_t i=0; i<n; i++) {
- Rec r;
+ R r;
auto rec = view.get(i);
r.key = rec->rec.key;
r.value = rec->rec.value;
@@ -167,11 +167,11 @@ START_TEST(t_point_lookup_miss)
{
size_t n = 10000;
- auto buffer = create_double_seq_mbuffer<Rec>(n, false);
+ auto buffer = create_double_seq_mbuffer<R>(n, false);
auto isam = Shard(buffer->get_buffer_view());
for (size_t i=n + 100; i<2*n; i++) {
- Rec r;
+ R r;
r.key = i;
r.value = i;
diff --git a/tests/include/wirs.h b/tests/include/wirs.h
new file mode 100644
index 0000000..90cd22d
--- /dev/null
+++ b/tests/include/wirs.h
@@ -0,0 +1,181 @@
+/*
+ * tests/include/rangequery.h
+ *
+ * Standardized unit tests for range queries against supporting
+ * shard types
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * WARNING: This file must be included in the main unit test set
+ * after the definition of an appropriate Shard and R
+ * type. In particular, R needs to implement the key-value
+ * pair interface and Shard needs to support lower_bound.
+ * For other types of record and shard, you'll need to
+ * use a different set of unit tests.
+ */
+#pragma once
+
+/*
+ * Uncomment these lines temporarily to remove errors in this file
+ * temporarily for development purposes. They should be removed prior
+ * to building, to ensure no duplicate definitions. These includes/defines
+ * should be included in the source file that includes this one, above the
+ * include statement.
+ */
+//#include "shard/ISAMTree.h"
+//#include "query/rangequery.h"
+//#include "testing.h"
+//#include <check.h>
+//using namespace de;
+//typedef ISAMTree<R> Shard;
+
+
+START_TEST(t_range_query)
+{
+ auto buffer = create_sequential_mbuffer<R>(100, 1000);
+ auto shard = Shard(buffer->get_buffer_view());
+
+ rq::Parms<R> parms;
+ parms.lower_bound = 300;
+ parms.upper_bound = 500;
+
+ auto state = rq::Query<R, Shard>::get_query_state(&shard, &parms);
+ auto result = rq::Query<R, Shard>::query(&shard, state, &parms);
+ rq::Query<R, Shard>::delete_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_le(result[i].rec.key, parms.upper_bound);
+ ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ }
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_buffer_range_query)
+{
+ auto buffer = create_sequential_mbuffer<R>(100, 1000);
+
+ rq::Parms<R> parms;
+ parms.lower_bound = 300;
+ parms.upper_bound = 500;
+
+ {
+ auto view = buffer->get_buffer_view();
+ auto state = rq::Query<R, Shard>::get_buffer_query_state(&view, &parms);
+ auto result = rq::Query<R, Shard>::buffer_query(state, &parms);
+ rq::Query<R, Shard>::delete_buffer_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_le(result[i].rec.key, parms.upper_bound);
+ ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ }
+ }
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_range_query_merge)
+{
+ auto buffer1 = create_sequential_mbuffer<R>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<R>(400, 1000);
+
+ auto shard1 = Shard(buffer1->get_buffer_view());
+ auto shard2 = Shard(buffer2->get_buffer_view());
+
+ rq::Parms<R> parms;
+ parms.lower_bound = 150;
+ parms.upper_bound = 500;
+
+ size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200;
+
+ auto state1 = rq::Query<R, Shard>::get_query_state(&shard1, &parms);
+ auto state2 = rq::Query<R, Shard>::get_query_state(&shard2, &parms);
+
+ std::vector<std::vector<de::Wrapped<R>>> results(2);
+ results[0] = rq::Query<R, Shard>::query(&shard1, state1, &parms);
+ results[1] = rq::Query<R, Shard>::query(&shard2, state2, &parms);
+
+ rq::Query<R, Shard>::delete_query_state(state1);
+ rq::Query<R, Shard>::delete_query_state(state2);
+
+ ck_assert_int_eq(results[0].size() + results[1].size(), result_size);
+
+ std::vector<std::vector<Wrapped<R>>> proc_results;
+
+ for (size_t j=0; j<results.size(); j++) {
+ proc_results.emplace_back(std::vector<Wrapped<R>>());
+ for (size_t i=0; i<results[j].size(); i++) {
+ proc_results[j].emplace_back(results[j][i]);
+ }
+ }
+
+ auto result = rq::Query<R, Shard>::merge(proc_results, nullptr);
+ std::sort(result.begin(), result.end());
+
+ ck_assert_int_eq(result.size(), result_size);
+ auto key = parms.lower_bound;
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_eq(key++, result[i].key);
+ if (key == 200) {
+ key = 400;
+ }
+ }
+
+ delete buffer1;
+ delete buffer2;
+}
+END_TEST
+
+
+START_TEST(t_lower_bound)
+{
+ auto buffer1 = create_sequential_mbuffer<R>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<R>(400, 1000);
+
+ auto shard1 = new Shard(buffer1->get_buffer_view());
+ auto shard2 = new Shard(buffer2->get_buffer_view());
+
+ std::vector<Shard*> shards = {shard1, shard2};
+
+ auto merged = Shard(shards);
+
+ for (size_t i=100; i<1000; i++) {
+ R r;
+ r.key = i;
+ r.value = i;
+
+ auto idx = merged.get_lower_bound(i);
+
+ assert(idx < merged.get_record_count());
+
+ auto res = merged.get_record_at(idx);
+
+ if (i >=200 && i <400) {
+ ck_assert_int_lt(res->rec.key, i);
+ } else {
+ ck_assert_int_eq(res->rec.key, i);
+ }
+ }
+
+ delete buffer1;
+ delete buffer2;
+ delete shard1;
+ delete shard2;
+}
+END_TEST
+
+static void inject_rangequery_tests(Suite *suite) {
+ TCase *range_query = tcase_create("Range Query Testing");
+ tcase_add_test(range_query, t_range_query);
+ tcase_add_test(range_query, t_buffer_range_query);
+ tcase_add_test(range_query, t_range_query_merge);
+ suite_add_tcase(suite, range_query);
+}
diff --git a/tests/include/wss.h b/tests/include/wss.h
new file mode 100644
index 0000000..f0ac74c
--- /dev/null
+++ b/tests/include/wss.h
@@ -0,0 +1,144 @@
+/*
+ * tests/include/rangequery.h
+ *
+ * Standardized unit tests for range queries against supporting
+ * shard types
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * WARNING: This file must be included in the main unit test set
+ * after the definition of an appropriate Shard and R
+ * type. In particular, R needs to implement the key-value
+ * pair interface and Shard needs to support lower_bound.
+ * For other types of record and shard, you'll need to
+ * use a different set of unit tests.
+ */
+#pragma once
+
+/*
+ * Uncomment these lines temporarily to remove errors in this file
+ * temporarily for development purposes. They should be removed prior
+ * to building, to ensure no duplicate definitions. These includes/defines
+ * should be included in the source file that includes this one, above the
+ * include statement.
+ */
+#include "shard/Alias.h"
+#include "testing.h"
+#include <check.h>
+using namespace de;
+typedef Alias<R> Shard;
+
+#include "query/wss.h"
+
+START_TEST(t_wss_query)
+{
+ auto buffer = create_weighted_mbuffer<R>(1000);
+ auto shard = Shard(buffer->get_buffer_view());
+
+ auto rng = gsl_rng_alloc(gsl_rng_mt19937);
+
+ wss::Parms<R> parms;
+ parms.rng = rng;
+ parms.sample_size = 20;
+
+ auto state = wss::Query<R, Shard>::get_query_state(&shard, &parms);
+ auto result = wss::Query<R, Shard>::query(&shard, state, &parms);
+ wss::Query<R, Shard>::delete_query_state(state);
+
+ delete buffer;
+ gsl_rng_free(rng);
+}
+END_TEST
+
+
+START_TEST(t_buffer_wss_query)
+{
+ auto buffer = create_weighted_mbuffer<R>(1000);
+
+
+ auto rng = gsl_rng_alloc(gsl_rng_mt19937);
+
+ wss::Parms<R> parms;
+ parms.rng = rng;
+
+ {
+ auto view = buffer->get_buffer_view();
+ auto state = wss::Query<R, Shard>::get_buffer_query_state(&view, &parms);
+ auto result = wss::Query<R, Shard>::buffer_query(state, &parms);
+ wss::Query<R, Shard>::delete_buffer_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.sample_size);
+ for (size_t i=0; i<result.size(); i++) {
+
+ }
+ }
+
+ delete buffer;
+}
+END_TEST
+
+
+/*
+START_TEST(t_range_query_merge)
+{
+ auto buffer1 = create_sequential_mbuffer<R>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<R>(400, 1000);
+
+ auto shard1 = Shard(buffer1->get_buffer_view());
+ auto shard2 = Shard(buffer2->get_buffer_view());
+
+ wss::Parms<R> parms;
+ parms.lower_bound = 150;
+ parms.upper_bound = 500;
+
+ size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200;
+
+ auto state1 = wss::Query<R, Shard>::get_query_state(&shard1, &parms);
+ auto state2 = wss::Query<R, Shard>::get_query_state(&shard2, &parms);
+
+ std::vector<std::vector<de::Wrapped<R>>> results(2);
+ results[0] = wss::Query<R, Shard>::query(&shard1, state1, &parms);
+ results[1] = wss::Query<R, Shard>::query(&shard2, state2, &parms);
+
+ wss::Query<R, Shard>::delete_query_state(state1);
+ wss::Query<R, Shard>::delete_query_state(state2);
+
+ ck_assert_int_eq(results[0].size() + results[1].size(), result_size);
+
+ std::vector<std::vector<Wrapped<R>>> proc_results;
+
+ for (size_t j=0; j<results.size(); j++) {
+ proc_results.emplace_back(std::vector<Wrapped<R>>());
+ for (size_t i=0; i<results[j].size(); i++) {
+ proc_results[j].emplace_back(results[j][i]);
+ }
+ }
+
+ auto result = wss::Query<R, Shard>::merge(proc_results, nullptr);
+ std::sort(result.begin(), result.end());
+
+ ck_assert_int_eq(result.size(), result_size);
+ auto key = parms.lower_bound;
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_eq(key++, result[i].key);
+ if (key == 200) {
+ key = 400;
+ }
+ }
+
+ delete buffer1;
+ delete buffer2;
+}
+END_TEST
+*/
+
+
+static void inject_wss_tests(Suite *suite) {
+ TCase *wss_query = tcase_create("WSS Query Testing");
+ tcase_add_test(wss_query, t_wss_query);
+ tcase_add_test(wss_query, t_buffer_wss_query);
+ //tcase_add_test(wss_query, t_wss_query_merge);
+ suite_add_tcase(suite, wss_query);
+}
diff --git a/tests/memisam_tests.cpp b/tests/memisam_tests.cpp
index b398524..9117ce3 100644
--- a/tests/memisam_tests.cpp
+++ b/tests/memisam_tests.cpp
@@ -12,21 +12,19 @@
#include "shard/ISAMTree.h"
#include "include/testing.h"
-#include "query/rangequery.h"
#include <check.h>
-
-
using namespace de;
-typedef ISAMTree<Rec> Shard;
+typedef Rec R;
+typedef ISAMTree<R> Shard;
#include "include/shard_standard.h"
#include "include/rangequery.h"
Suite *unit_testing()
{
- Suite *unit = suite_create("ISAMTree Shard Unit Testing");
+ Suite *unit = suite_create("Alias-augmented B+Tree Shard Unit Testing");
inject_rangequery_tests(unit);
inject_shard_tests(unit);
diff --git a/tests/pgm_tests.cpp b/tests/pgm_tests.cpp
index c7750ac..ee350de 100644
--- a/tests/pgm_tests.cpp
+++ b/tests/pgm_tests.cpp
@@ -1,7 +1,7 @@
/*
- * tests/irs_tests.cpp
+ * tests/isam_tests.cpp
*
- * Unit tests for PGM (Augmented B+Tree) shard
+ * Unit tests for ISAM Tree shard
*
* Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
@@ -11,330 +11,23 @@
*/
#include "shard/PGM.h"
-#include "query/rangequery.h"
-#include "testing.h"
-
+#include "include/testing.h"
#include <check.h>
using namespace de;
-typedef PGM<Rec> Shard;
-
-START_TEST(t_mbuffer_init)
-{
- auto buffer = new MutableBuffer<Rec>(1024, 1024);
- for (uint64_t i = 512; i > 0; i--) {
- uint32_t v = i;
- buffer->append({i,v, 1});
- }
-
- for (uint64_t i = 1; i <= 256; ++i) {
- uint32_t v = i;
- buffer->append({i, v, 1}, true);
- }
-
- for (uint64_t i = 257; i <= 512; ++i) {
- uint32_t v = i + 1;
- buffer->append({i, v, 1});
- }
-
- Shard* shard = new Shard(buffer);
- ck_assert_uint_eq(shard->get_record_count(), 512);
-
- delete buffer;
- delete shard;
-}
-
-
-START_TEST(t_irs_init)
-{
- size_t n = 512;
- auto mbuffer1 = create_test_mbuffer<Rec>(n);
- auto mbuffer2 = create_test_mbuffer<Rec>(n);
- auto mbuffer3 = create_test_mbuffer<Rec>(n);
-
- auto shard1 = new Shard(mbuffer1);
- auto shard2 = new Shard(mbuffer2);
- auto shard3 = new Shard(mbuffer3);
-
- Shard* shards[3] = {shard1, shard2, shard3};
- auto shard4 = new Shard(shards, 3);
-
- ck_assert_int_eq(shard4->get_record_count(), n * 3);
- ck_assert_int_eq(shard4->get_tombstone_count(), 0);
-
- size_t total_cnt = 0;
- size_t shard1_idx = 0;
- size_t shard2_idx = 0;
- size_t shard3_idx = 0;
-
- for (size_t i = 0; i < shard4->get_record_count(); ++i) {
- auto rec1 = shard1->get_record_at(shard1_idx);
- auto rec2 = shard2->get_record_at(shard2_idx);
- auto rec3 = shard3->get_record_at(shard3_idx);
-
- auto cur_rec = shard4->get_record_at(i);
-
- if (shard1_idx < n && cur_rec->rec == rec1->rec) {
- ++shard1_idx;
- } else if (shard2_idx < n && cur_rec->rec == rec2->rec) {
- ++shard2_idx;
- } else if (shard3_idx < n && cur_rec->rec == rec3->rec) {
- ++shard3_idx;
- } else {
- assert(false);
- }
- }
-
- delete mbuffer1;
- delete mbuffer2;
- delete mbuffer3;
-
- delete shard1;
- delete shard2;
- delete shard3;
- delete shard4;
-}
-
-START_TEST(t_point_lookup)
-{
- size_t n = 10000;
-
- auto buffer = create_double_seq_mbuffer<Rec>(n, false);
- auto shard = Shard(buffer);
-
- for (size_t i=0; i<n; i++) {
- Rec r;
- auto rec = (buffer->get_data() + i);
- r.key = rec->rec.key;
- r.value = rec->rec.value;
-
- auto result = shard.point_lookup(r);
- ck_assert_ptr_nonnull(result);
- ck_assert_int_eq(result->rec.key, r.key);
- ck_assert_int_eq(result->rec.value, r.value);
- }
-
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_point_lookup_miss)
-{
- size_t n = 10000;
-
- auto buffer = create_double_seq_mbuffer<Rec>(n, false);
- auto isam = Shard(buffer);
-
- for (size_t i=n + 100; i<2*n; i++) {
- Rec r;
- r.key = i;
- r.value = i;
-
- auto result = isam.point_lookup(r);
- ck_assert_ptr_null(result);
- }
-
- delete buffer;
-}
-
-
-START_TEST(t_range_query)
-{
- auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
- auto shard = Shard(buffer);
-
- rq::Parms<Rec> parms;
- parms.lower_bound = 300;
- parms.upper_bound = 500;
-
- auto state = rq::Query<Shard, Rec>::get_query_state(&shard, &parms);
- auto result = rq::Query<Shard, Rec>::query(&shard, state, &parms);
- rq::Query<Shard, Rec>::delete_query_state(state);
-
- ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
- for (size_t i=0; i<result.size(); i++) {
- ck_assert_int_le(result[i].rec.key, parms.upper_bound);
- ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
- }
-
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_buffer_range_query)
-{
- auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
-
- rq::Parms<Rec> parms;
- parms.lower_bound = 300;
- parms.upper_bound = 500;
-
- auto state = rq::Query<Shard, Rec>::get_buffer_query_state(buffer, &parms);
- auto result = rq::Query<Shard, Rec>::buffer_query(buffer, state, &parms);
- rq::Query<Shard, Rec>::delete_buffer_query_state(state);
-
- ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
- for (size_t i=0; i<result.size(); i++) {
- ck_assert_int_le(result[i].rec.key, parms.upper_bound);
- ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
- }
-
- delete buffer;
-}
-END_TEST
-
-
-START_TEST(t_range_query_merge)
-{
- auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
- auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
-
- auto shard1 = Shard(buffer1);
- auto shard2 = Shard(buffer2);
-
- rq::Parms<Rec> parms;
- parms.lower_bound = 150;
- parms.upper_bound = 500;
-
- size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200;
-
- auto state1 = rq::Query<Shard, Rec>::get_query_state(&shard1, &parms);
- auto state2 = rq::Query<Shard, Rec>::get_query_state(&shard2, &parms);
-
- std::vector<std::vector<de::Wrapped<Rec>>> results(2);
- results[0] = rq::Query<Shard, Rec>::query(&shard1, state1, &parms);
- results[1] = rq::Query<Shard, Rec>::query(&shard2, state2, &parms);
-
- rq::Query<Shard, Rec>::delete_query_state(state1);
- rq::Query<Shard, Rec>::delete_query_state(state2);
-
- ck_assert_int_eq(results[0].size() + results[1].size(), result_size);
-
- std::vector<std::vector<Wrapped<Rec>>> proc_results;
-
- for (size_t j=0; j<results.size(); j++) {
- proc_results.emplace_back(std::vector<Wrapped<Rec>>());
- for (size_t i=0; i<results[j].size(); i++) {
- proc_results[j].emplace_back(results[j][i]);
- }
- }
-
- auto result = rq::Query<Shard, Rec>::merge(proc_results, nullptr);
- std::sort(result.begin(), result.end());
-
- ck_assert_int_eq(result.size(), result_size);
- auto key = parms.lower_bound;
- for (size_t i=0; i<result.size(); i++) {
- ck_assert_int_eq(key++, result[i].key);
- if (key == 200) {
- key = 400;
- }
- }
-
- delete buffer1;
- delete buffer2;
-}
-END_TEST
-
-START_TEST(t_lower_bound)
-{
- auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
- auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
-
- de::PGM<Rec> *shards[2];
-
- auto shard1 = Shard(buffer1);
- auto shard2 = Shard(buffer2);
-
- shards[0] = &shard1;
- shards[1] = &shard2;
-
- auto merged = Shard(shards, 2);
-
- for (size_t i=100; i<1000; i++) {
- Rec r;
- r.key = i;
- r.value = i;
-
- auto idx = merged.get_lower_bound(i);
-
- assert(idx < merged.get_record_count());
-
- auto res = merged.get_record_at(idx);
-
- if (i >=200 && i <400) {
- ck_assert_int_lt(res->rec.key, i);
- } else {
- ck_assert_int_eq(res->rec.key, i);
- }
- }
-
- delete buffer1;
- delete buffer2;
-}
-END_TEST
-
-
-START_TEST(t_full_cancelation)
-{
- size_t n = 100;
- auto buffer = create_double_seq_mbuffer<Rec>(n, false);
- auto buffer_ts = create_double_seq_mbuffer<Rec>(n, true);
-
- Shard* shard = new Shard(buffer);
- Shard* shard_ts = new Shard(buffer_ts);
-
- ck_assert_int_eq(shard->get_record_count(), n);
- ck_assert_int_eq(shard->get_tombstone_count(), 0);
- ck_assert_int_eq(shard_ts->get_record_count(), n);
- ck_assert_int_eq(shard_ts->get_tombstone_count(), n);
-
- Shard* shards[] = {shard, shard_ts};
-
- Shard* merged = new Shard(shards, 2);
-
- ck_assert_int_eq(merged->get_tombstone_count(), 0);
- ck_assert_int_eq(merged->get_record_count(), 0);
-
- delete buffer;
- delete buffer_ts;
- delete shard;
- delete shard_ts;
- delete merged;
-}
-END_TEST
+typedef Rec R;
+typedef PGM<R> Shard;
+#include "include/shard_standard.h"
+#include "include/rangequery.h"
Suite *unit_testing()
{
Suite *unit = suite_create("PGM Shard Unit Testing");
- TCase *create = tcase_create("de::PGM constructor Testing");
- tcase_add_test(create, t_mbuffer_init);
- tcase_add_test(create, t_irs_init);
- tcase_set_timeout(create, 100);
- suite_add_tcase(unit, create);
-
-
- TCase *tombstone = tcase_create("de:PGM::tombstone cancellation Testing");
- tcase_add_test(tombstone, t_full_cancelation);
- suite_add_tcase(unit, tombstone);
-
-
- TCase *lookup = tcase_create("de:PGM:point_lookup Testing");
- tcase_add_test(lookup, t_point_lookup);
- tcase_add_test(lookup, t_point_lookup_miss);
- tcase_add_test(lookup, t_lower_bound);
- suite_add_tcase(unit, lookup);
-
- TCase *range_query = tcase_create("de:PGM::range_query Testing");
- tcase_add_test(range_query, t_range_query);
- tcase_add_test(range_query, t_buffer_range_query);
- tcase_add_test(range_query, t_range_query_merge);
- suite_add_tcase(unit, range_query);
+ inject_rangequery_tests(unit);
+ inject_shard_tests(unit);
return unit;
}
diff --git a/tests/rangecount_tests.cpp b/tests/rangecount_tests.cpp
index fe3a587..3be8234 100644
--- a/tests/rangecount_tests.cpp
+++ b/tests/rangecount_tests.cpp
@@ -19,6 +19,7 @@
using namespace de;
+typedef Rec R;
typedef ISAMTree<Rec> Shard;
#include "include/rangecount.h"
diff --git a/tests/rangequery_tests.cpp b/tests/rangequery_tests.cpp
index 49c73d3..bf5fb5e 100644
--- a/tests/rangequery_tests.cpp
+++ b/tests/rangequery_tests.cpp
@@ -19,7 +19,8 @@
using namespace de;
-typedef ISAMTree<Rec> Shard;
+typedef Rec R;
+typedef ISAMTree<R> Shard;
#include "include/rangequery.h"
diff --git a/tests/triespline_tests.cpp b/tests/triespline_tests.cpp
index 9df278f..e884360 100644
--- a/tests/triespline_tests.cpp
+++ b/tests/triespline_tests.cpp
@@ -12,19 +12,19 @@
#include "shard/TrieSpline.h"
#include "include/testing.h"
-#include "query/rangequery.h"
#include <check.h>
using namespace de;
-typedef TrieSpline<Rec> Shard;
+typedef Rec R;
+typedef TrieSpline<R> Shard;
#include "include/shard_standard.h"
#include "include/rangequery.h"
Suite *unit_testing()
{
- Suite *unit = suite_create("ISAMTree Shard Unit Testing");
+ Suite *unit = suite_create("Triespline Shard Unit Testing");
inject_rangequery_tests(unit);
inject_shard_tests(unit);