diff options
| -rw-r--r-- | .gitmodules | 3 | ||||
| -rw-r--r-- | CMakeLists.txt | 9 | ||||
| m--------- | external/fast-succinct-trie | 0 | ||||
| -rw-r--r-- | include/shard/FSTrie.h | 266 | ||||
| -rw-r--r-- | tests/fst_tests.cpp | 55 |
5 files changed, 332 insertions, 1 deletions
diff --git a/.gitmodules b/.gitmodules index b4b36b7..38345a8 100644 --- a/.gitmodules +++ b/.gitmodules @@ -19,3 +19,6 @@ [submodule "external/ctpl"] path = external/ctpl url = git@github.com:vit-vit/CTPL.git +[submodule "external/fast-succinct-trie"] + path = external/fast-succinct-trie + url = git@github.com:efficient/fast-succinct-trie.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 1bffafc..65afc9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench true) set(old_bench False) @@ -117,10 +117,17 @@ if (tests) target_compile_options(pgm_tests PUBLIC -fopenmp) + add_executable(triespline_debug ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_debug.cpp) target_link_libraries(triespline_debug PUBLIC gsl check subunit pthread atomic) target_link_options(triespline_debug PUBLIC -mcx16) target_include_directories(triespline_debug PRIVATE include external/psudb-common/cpp/include external/PLEX/include) + + + add_executable(fst_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/fst_tests.cpp) + target_link_libraries(fst_tests PUBLIC gsl check subunit pthread atomic FST) + target_link_options(fst_tests PUBLIC -mcx16) + target_include_directories(fst_tests PRIVATE include external/psudb-common/cpp/include external/PLEX/include external/fast-succinct-trie/fst/include) endif() if (bench) diff --git a/external/fast-succinct-trie b/external/fast-succinct-trie new file mode 160000 +Subproject 2ff7d982f567c128312ea80457fd95dc75acd34 diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h new file mode 100644 index 0000000..11a232d --- /dev/null +++ b/include/shard/FSTrie.h @@ -0,0 +1,266 @@ +/* + * include/shard/FSTrie.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * + * Distributed under the Modified BSD License. + * + * A shard shim around the FSTrie learned index. + * + * TODO: The code in this file is very poorly commented. + */ +#pragma once + + +#include <vector> + +#include "framework/ShardRequirements.h" +#include "FST.hpp" +#include "psu-ds/BloomFilter.h" +#include "util/bf_config.h" +#include "util/SortedMerge.h" + +using psudb::CACHELINE_SIZE; +using psudb::BloomFilter; +using psudb::PriorityQueue; +using psudb::queue_record; +using psudb::byte; + +namespace de { + +template <KVPInterface R, size_t E=1024> +class FSTrie { +private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; + + static_assert(std::is_same_v<K, std::string> || std::is_same_v<K, uint64_t>, + "FST requires either string or uint64_t keys"); + +public: + FSTrie(BufferView<R> buffer) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped<R>), + (byte**) &m_data); + size_t cnt = 0; + std::vector<K> keys; + keys.reserve(buffer.get_record_count()); + + std::vector<size_t> values; + values.reserve(buffer.get_record_count()); + + size_t longest_key = 0; + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE, + buffer.get_record_count(), + sizeof(Wrapped<R>)); + buffer.copy_to_buffer((byte *) temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less<Wrapped<R>>()); + + for (size_t i=0; i<buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted()) { + continue; + } + + m_data[cnt] = temp_buffer[i]; + keys.push_back(m_data[cnt].rec.key); + values.push_back(cnt); + if constexpr (std::is_same_v<K, std::string>) { + if (m_data[cnt].rec.key.size() > longest_key) { + longest_key = m_data[cnt].rec.key.size(); + } + } + + cnt++; + } + + m_reccnt = cnt; + m_fst = FST(); + if constexpr (std::is_same_v<K, std::string>) { + m_fst.load(keys, values, longest_key); + } else { + m_fst.load(keys, values); + } + + free(temp_buffer); + } + + FSTrie(std::vector<FSTrie*> &shards) + : m_data(nullptr) + , m_reccnt(0) + , m_alloc_size(0) + { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped<R>), + (byte **) &m_data); + + std::vector<K> keys; + keys.reserve(attemp_reccnt); + + std::vector<size_t> values; + values.reserve(attemp_reccnt); + + size_t longest_key = 0; + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue<Wrapped<R>> pq(cursors.size()); + for (size_t i=0; i<cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + m_data[m_reccnt] = *cursor.ptr; + keys.push_back(m_data[m_reccnt].rec.key); + values.push_back(m_data[m_reccnt].rec.value); + + if constexpr (std::is_same_v<K, std::string>) { + if (m_data[m_reccnt].rec.key.size() > longest_key) { + longest_key = m_data[m_reccnt].rec.key.size(); + } + } + + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + m_fst = FST(); + if constexpr (std::is_same_v<K, std::string>) { + m_fst.load(keys, values, longest_key); + } else { + m_fst.load(keys, values); + } + } + } + + ~FSTrie() { + free(m_data); + } + + Wrapped<R> *point_lookup(const R &rec, bool filter=false) { + size_t idx; + bool res; + if constexpr (std::is_same_v<K, std::string>) { + res = m_fst.lookup(rec.key.c_str(), rec.key.size(), idx); + } else { + res = m_fst.lookup(rec.key, idx); + } + + if (res) { + return m_data + idx; + } + + return nullptr; + } + + Wrapped<R>* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return 0; + } + + const Wrapped<R>* get_record_at(size_t idx) const { + if (idx >= m_reccnt) return nullptr; + return m_data + idx; + } + + + size_t get_memory_usage() { + return m_fst.mem() + m_alloc_size; + } + + size_t get_aux_memory_usage() { + return 0; + } + + size_t get_lower_bound(const K& key) { + auto itr = FSTIter(); + + const K temp_key = key; + + bool res; + if constexpr (std::is_same_v<K, std::string>) { + res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); + } else { + res = m_fst.lowerBound(temp_key, itr); + } + + return itr.value(); + } + + size_t get_upper_bound(const K& key) { + auto itr = FSTIter(); + + const K temp_key = key; + + bool res; + if constexpr (std::is_same_v<K, std::string>) { + res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); + } else { + res = m_fst.lowerBound(temp_key, itr); + } + + size_t idx = itr.value(); + while (idx < m_reccnt && m_data[idx].rec.key <= key) { + idx++; + } + + return idx; + } + +private: + + Wrapped<R>* m_data; + size_t m_reccnt; + size_t m_alloc_size; + FST m_fst; +}; +} diff --git a/tests/fst_tests.cpp b/tests/fst_tests.cpp new file mode 100644 index 0000000..298c104 --- /dev/null +++ b/tests/fst_tests.cpp @@ -0,0 +1,55 @@ +/* + * tests/isam_tests.cpp + * + * Unit tests for ISAM Tree shard + * + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ + +#include "shard/FSTrie.h" +#include "include/testing.h" +#include <check.h> + +using namespace de; + +typedef Rec R; +typedef FSTrie<R> Shard; + +#include "include/shard_standard.h" +#include "include/rangequery.h" + +Suite *unit_testing() +{ + Suite *unit = suite_create("Fast-succinct Trie Shard Unit Testing"); + + inject_rangequery_tests(unit); + inject_shard_tests(unit); + + return unit; +} + + +int shard_unit_tests() +{ + int failed = 0; + Suite *unit = unit_testing(); + SRunner *unit_shardner = srunner_create(unit); + + srunner_run_all(unit_shardner, CK_NORMAL); + failed = srunner_ntests_failed(unit_shardner); + srunner_free(unit_shardner); + + return failed; +} + + +int main() +{ + int unit_failed = shard_unit_tests(); + + return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +} |