summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitmodules3
-rw-r--r--CMakeLists.txt9
m---------external/fast-succinct-trie0
-rw-r--r--include/shard/FSTrie.h266
-rw-r--r--tests/fst_tests.cpp55
5 files changed, 332 insertions, 1 deletions
diff --git a/.gitmodules b/.gitmodules
index b4b36b7..38345a8 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -19,3 +19,6 @@
[submodule "external/ctpl"]
path = external/ctpl
url = git@github.com:vit-vit/CTPL.git
+[submodule "external/fast-succinct-trie"]
+ path = external/fast-succinct-trie
+ url = git@github.com:efficient/fast-succinct-trie.git
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1bffafc..65afc9f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug false)
+set(debug true)
set(tests True)
set(bench true)
set(old_bench False)
@@ -117,10 +117,17 @@ if (tests)
target_compile_options(pgm_tests PUBLIC -fopenmp)
+
add_executable(triespline_debug ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_debug.cpp)
target_link_libraries(triespline_debug PUBLIC gsl check subunit pthread atomic)
target_link_options(triespline_debug PUBLIC -mcx16)
target_include_directories(triespline_debug PRIVATE include external/psudb-common/cpp/include external/PLEX/include)
+
+
+ add_executable(fst_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/fst_tests.cpp)
+ target_link_libraries(fst_tests PUBLIC gsl check subunit pthread atomic FST)
+ target_link_options(fst_tests PUBLIC -mcx16)
+ target_include_directories(fst_tests PRIVATE include external/psudb-common/cpp/include external/PLEX/include external/fast-succinct-trie/fst/include)
endif()
if (bench)
diff --git a/external/fast-succinct-trie b/external/fast-succinct-trie
new file mode 160000
+Subproject 2ff7d982f567c128312ea80457fd95dc75acd34
diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h
new file mode 100644
index 0000000..11a232d
--- /dev/null
+++ b/include/shard/FSTrie.h
@@ -0,0 +1,266 @@
+/*
+ * include/shard/FSTrie.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A shard shim around the FSTrie learned index.
+ *
+ * TODO: The code in this file is very poorly commented.
+ */
+#pragma once
+
+
+#include <vector>
+
+#include "framework/ShardRequirements.h"
+#include "FST.hpp"
+#include "psu-ds/BloomFilter.h"
+#include "util/bf_config.h"
+#include "util/SortedMerge.h"
+
+using psudb::CACHELINE_SIZE;
+using psudb::BloomFilter;
+using psudb::PriorityQueue;
+using psudb::queue_record;
+using psudb::byte;
+
+namespace de {
+
+template <KVPInterface R, size_t E=1024>
+class FSTrie {
+private:
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+
+ static_assert(std::is_same_v<K, std::string> || std::is_same_v<K, uint64_t>,
+ "FST requires either string or uint64_t keys");
+
+public:
+ FSTrie(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ buffer.get_record_count() *
+ sizeof(Wrapped<R>),
+ (byte**) &m_data);
+ size_t cnt = 0;
+ std::vector<K> keys;
+ keys.reserve(buffer.get_record_count());
+
+ std::vector<size_t> values;
+ values.reserve(buffer.get_record_count());
+
+ size_t longest_key = 0;
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
+ buffer.get_record_count(),
+ sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *) temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted()) {
+ continue;
+ }
+
+ m_data[cnt] = temp_buffer[i];
+ keys.push_back(m_data[cnt].rec.key);
+ values.push_back(cnt);
+ if constexpr (std::is_same_v<K, std::string>) {
+ if (m_data[cnt].rec.key.size() > longest_key) {
+ longest_key = m_data[cnt].rec.key.size();
+ }
+ }
+
+ cnt++;
+ }
+
+ m_reccnt = cnt;
+ m_fst = FST();
+ if constexpr (std::is_same_v<K, std::string>) {
+ m_fst.load(keys, values, longest_key);
+ } else {
+ m_fst.load(keys, values);
+ }
+
+ free(temp_buffer);
+ }
+
+ FSTrie(std::vector<FSTrie*> &shards)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count);
+
+ m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
+ attemp_reccnt * sizeof(Wrapped<R>),
+ (byte **) &m_data);
+
+ std::vector<K> keys;
+ keys.reserve(attemp_reccnt);
+
+ std::vector<size_t> values;
+ values.reserve(attemp_reccnt);
+
+ size_t longest_key = 0;
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ m_data[m_reccnt] = *cursor.ptr;
+ keys.push_back(m_data[m_reccnt].rec.key);
+ values.push_back(m_data[m_reccnt].rec.value);
+
+ if constexpr (std::is_same_v<K, std::string>) {
+ if (m_data[m_reccnt].rec.key.size() > longest_key) {
+ longest_key = m_data[m_reccnt].rec.key.size();
+ }
+ }
+
+ m_reccnt++;
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ }
+ }
+
+ if (m_reccnt > 0) {
+ m_fst = FST();
+ if constexpr (std::is_same_v<K, std::string>) {
+ m_fst.load(keys, values, longest_key);
+ } else {
+ m_fst.load(keys, values);
+ }
+ }
+ }
+
+ ~FSTrie() {
+ free(m_data);
+ }
+
+ Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+ size_t idx;
+ bool res;
+ if constexpr (std::is_same_v<K, std::string>) {
+ res = m_fst.lookup(rec.key.c_str(), rec.key.size(), idx);
+ } else {
+ res = m_fst.lookup(rec.key, idx);
+ }
+
+ if (res) {
+ return m_data + idx;
+ }
+
+ return nullptr;
+ }
+
+ Wrapped<R>* get_data() const {
+ return m_data;
+ }
+
+ size_t get_record_count() const {
+ return m_reccnt;
+ }
+
+ size_t get_tombstone_count() const {
+ return 0;
+ }
+
+ const Wrapped<R>* get_record_at(size_t idx) const {
+ if (idx >= m_reccnt) return nullptr;
+ return m_data + idx;
+ }
+
+
+ size_t get_memory_usage() {
+ return m_fst.mem() + m_alloc_size;
+ }
+
+ size_t get_aux_memory_usage() {
+ return 0;
+ }
+
+ size_t get_lower_bound(const K& key) {
+ auto itr = FSTIter();
+
+ const K temp_key = key;
+
+ bool res;
+ if constexpr (std::is_same_v<K, std::string>) {
+ res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr);
+ } else {
+ res = m_fst.lowerBound(temp_key, itr);
+ }
+
+ return itr.value();
+ }
+
+ size_t get_upper_bound(const K& key) {
+ auto itr = FSTIter();
+
+ const K temp_key = key;
+
+ bool res;
+ if constexpr (std::is_same_v<K, std::string>) {
+ res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr);
+ } else {
+ res = m_fst.lowerBound(temp_key, itr);
+ }
+
+ size_t idx = itr.value();
+ while (idx < m_reccnt && m_data[idx].rec.key <= key) {
+ idx++;
+ }
+
+ return idx;
+ }
+
+private:
+
+ Wrapped<R>* m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ FST m_fst;
+};
+}
diff --git a/tests/fst_tests.cpp b/tests/fst_tests.cpp
new file mode 100644
index 0000000..298c104
--- /dev/null
+++ b/tests/fst_tests.cpp
@@ -0,0 +1,55 @@
+/*
+ * tests/isam_tests.cpp
+ *
+ * Unit tests for ISAM Tree shard
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+
+#include "shard/FSTrie.h"
+#include "include/testing.h"
+#include <check.h>
+
+using namespace de;
+
+typedef Rec R;
+typedef FSTrie<R> Shard;
+
+#include "include/shard_standard.h"
+#include "include/rangequery.h"
+
+Suite *unit_testing()
+{
+ Suite *unit = suite_create("Fast-succinct Trie Shard Unit Testing");
+
+ inject_rangequery_tests(unit);
+ inject_shard_tests(unit);
+
+ return unit;
+}
+
+
+int shard_unit_tests()
+{
+ int failed = 0;
+ Suite *unit = unit_testing();
+ SRunner *unit_shardner = srunner_create(unit);
+
+ srunner_run_all(unit_shardner, CK_NORMAL);
+ failed = srunner_ntests_failed(unit_shardner);
+ srunner_free(unit_shardner);
+
+ return failed;
+}
+
+
+int main()
+{
+ int unit_failed = shard_unit_tests();
+
+ return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}