diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-03-19 11:10:02 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-03-19 18:14:57 -0400 |
| commit | 405bf4a20b4a22a6bb4b60b730b6a7e901fdccf6 (patch) | |
| tree | 1628557a62de01b8c5e69088d31d70100f45e862 | |
| parent | 481df63c0152e1b643ec0bd16500c4aca0716404 (diff) | |
| download | dynamic-extension-405bf4a20b4a22a6bb4b60b730b6a7e901fdccf6.tar.gz | |
FST Shard w/ tests
Needs some debugging--some methods currently fail within the library
itself.
The build system doesn't currently build the FST library. To compile,
you'll first need to manually build it, and then place the libFST.so file
in your LIBRARY_PATH and LD_LIBRARY_PATH.
| -rw-r--r-- | .gitmodules | 3 | ||||
| -rw-r--r-- | CMakeLists.txt | 9 | ||||
| m--------- | external/fast-succinct-trie | 0 | ||||
| -rw-r--r-- | include/shard/FSTrie.h | 266 | ||||
| -rw-r--r-- | tests/fst_tests.cpp | 55 |
5 files changed, 332 insertions, 1 deletions
diff --git a/.gitmodules b/.gitmodules index b4b36b7..38345a8 100644 --- a/.gitmodules +++ b/.gitmodules @@ -19,3 +19,6 @@ [submodule "external/ctpl"] path = external/ctpl url = git@github.com:vit-vit/CTPL.git +[submodule "external/fast-succinct-trie"] + path = external/fast-succinct-trie + url = git@github.com:efficient/fast-succinct-trie.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 1bffafc..65afc9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench true) set(old_bench False) @@ -117,10 +117,17 @@ if (tests) target_compile_options(pgm_tests PUBLIC -fopenmp) + add_executable(triespline_debug ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_debug.cpp) target_link_libraries(triespline_debug PUBLIC gsl check subunit pthread atomic) target_link_options(triespline_debug PUBLIC -mcx16) target_include_directories(triespline_debug PRIVATE include external/psudb-common/cpp/include external/PLEX/include) + + + add_executable(fst_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/fst_tests.cpp) + target_link_libraries(fst_tests PUBLIC gsl check subunit pthread atomic FST) + target_link_options(fst_tests PUBLIC -mcx16) + target_include_directories(fst_tests PRIVATE include external/psudb-common/cpp/include external/PLEX/include external/fast-succinct-trie/fst/include) endif() if (bench) diff --git a/external/fast-succinct-trie b/external/fast-succinct-trie new file mode 160000 +Subproject 2ff7d982f567c128312ea80457fd95dc75acd34 diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h new file mode 100644 index 0000000..11a232d --- /dev/null +++ b/include/shard/FSTrie.h @@ -0,0 +1,266 @@ +/* + * include/shard/FSTrie.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A shard shim around the FSTrie learned index. + * + * TODO: The code in this file is very poorly commented. + */ +#pragma once + + +#include <vector> + +#include "framework/ShardRequirements.h" +#include "FST.hpp" +#include "psu-ds/BloomFilter.h" +#include "util/bf_config.h" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template <KVPInterface R, size_t E=1024> +class FSTrie { +private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; + + static_assert(std::is_same_v<K, std::string> || std::is_same_v<K, uint64_t>, + "FST requires either string or uint64_t keys"); + +public: + FSTrie(BufferView<R> buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped<R>), + (byte**) &m_data); + size_t cnt = 0; + std::vector<K> keys; + keys.reserve(buffer.get_record_count()); + + std::vector<size_t> values; + values.reserve(buffer.get_record_count()); + + size_t longest_key = 0; + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + for (size_t i=0; i<buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted()) { + continue; + } + + m_data[cnt] = temp_buffer[i]; + keys.push_back(m_data[cnt].rec.key); + values.push_back(cnt); + if constexpr (std::is_same_v<K, std::string>) { + if (m_data[cnt].rec.key.size() > longest_key) { + longest_key = m_data[cnt].rec.key.size(); + } + } + + cnt++; + } + + m_reccnt = cnt; + m_fst = FST(); + if constexpr (std::is_same_v<K, std::string>) { + m_fst.load(keys, values, longest_key); + } else { + m_fst.load(keys, values); + } + + free(temp_buffer); + } + + FSTrie(std::vector<FSTrie*> &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped<R>), + (byte **) &m_data); + + std::vector<K> keys; + keys.reserve(attemp_reccnt); + + std::vector<size_t> values; + values.reserve(attemp_reccnt); + + size_t longest_key = 0; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + m_data[m_reccnt] = *cursor.ptr; + keys.push_back(m_data[m_reccnt].rec.key); + values.push_back(m_data[m_reccnt].rec.value); + + if constexpr (std::is_same_v<K, std::string>) { + if (m_data[m_reccnt].rec.key.size() > longest_key) { + longest_key = m_data[m_reccnt].rec.key.size(); + } + } + + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_fst = FST(); + if constexpr (std::is_same_v<K, std::string>) { + m_fst.load(keys, values, longest_key); + } else { + m_fst.load(keys, values); + } + } + } + + ~FSTrie() { + free(m_data); + } + + Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + size_t idx; + bool res; + if constexpr (std::is_same_v<K, std::string>) { + res = m_fst.lookup(rec.key.c_str(), rec.key.size(), idx); + } else { + res = m_fst.lookup(rec.key, idx); + } + + if (res) { + return m_data + idx; + } + + return nullptr; + } + + Wrapped<R>* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped<R>* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_fst.mem() + m_alloc_size; + } + + size_t get_aux_memory_usage() { + return 0; + } + + size_t get_lower_bound(const K& key) { + auto itr = FSTIter(); + + const K temp_key = key; + + bool res; + if constexpr (std::is_same_v<K, std::string>) { + res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); + } else { + res = m_fst.lowerBound(temp_key, itr); + } + + return itr.value(); + } + + size_t get_upper_bound(const K& key) { + auto itr = FSTIter(); + + const K temp_key = key; + + bool res; + if constexpr (std::is_same_v<K, std::string>) { + res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); + } else { + res = m_fst.lowerBound(temp_key, itr); + } + + size_t idx = itr.value(); + while (idx < m_reccnt && m_data[idx].rec.key <= key) { + idx++; + } + + return idx; + } + +private: + + Wrapped<R>* m_data; + size_t m_reccnt; + size_t m_alloc_size; + FST m_fst; +}; +} diff --git a/tests/fst_tests.cpp b/tests/fst_tests.cpp new file mode 100644 index 0000000..298c104 --- /dev/null +++ b/tests/fst_tests.cpp @@ -0,0 +1,55 @@ +/* + * tests/isam_tests.cpp + * + * Unit tests for ISAM Tree shard + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ + +#include "shard/FSTrie.h" +#include "include/testing.h" +#include <check.h> + +using namespace de; + +typedef Rec R; +typedef FSTrie<R> Shard; + +#include "include/shard_standard.h" +#include "include/rangequery.h" + +Suite *unit_testing() +{ + Suite *unit = suite_create("Fast-succinct Trie Shard Unit Testing"); + + inject_rangequery_tests(unit); + inject_shard_tests(unit); + + return unit; +} + + +int shard_unit_tests() +{ + int failed = 0; + Suite *unit = unit_testing(); + SRunner *unit_shardner = srunner_create(unit); + + srunner_run_all(unit_shardner, CK_NORMAL); + failed = srunner_ntests_failed(unit_shardner); + srunner_free(unit_shardner); + + return failed; +} + + +int main() +{ + int unit_failed = shard_unit_tests(); + + return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +} |