From 147f0df58e1ff4973bffb7e4628e6b2fdc20eb57 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Fri, 22 Mar 2024 14:04:40 -0400 Subject: FSTrie testing and debugging --- benchmarks/string_insertion_tput.cpp | 92 ++++++++++++++++++++++++++ include/shard/FSTrie.h | 121 ++++++----------------------------- tests/include/shard_string.h | 8 +-- tests/include/testing.h | 6 +- 4 files changed, 118 insertions(+), 109 deletions(-) create mode 100644 benchmarks/string_insertion_tput.cpp diff --git a/benchmarks/string_insertion_tput.cpp b/benchmarks/string_insertion_tput.cpp new file mode 100644 index 0000000..d205175 --- /dev/null +++ b/benchmarks/string_insertion_tput.cpp @@ -0,0 +1,92 @@ +/* + * + */ + +#define ENABLE_TIMER + +#include +#include + +#include "framework/DynamicExtension.h" +#include "shard/FSTrie.h" +#include "query/rangequery.h" +#include "framework/interface/Record.h" + +#include "psu-util/timer.h" +#include "psu-util/progress.h" + + +typedef de::Record Rec; +typedef de::FSTrie Trie; +typedef de::rq::Query Q; +typedef de::DynamicExtension Ext; //, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext; + +std::vector strings; + +void insert_thread(int64_t start, int64_t end, Ext *extension) { + for (uint64_t i=start; iinsert(r)) { + _mm_pause(); + } + } +} + +void read_data(std::string fname, size_t n=10000000) { + strings.reserve(n); + + std::fstream file; + file.open(fname, std::ios::in); + + size_t i=0; + std::string line; + while (i < n && std::getline(file, line, '\n')) { + strings.emplace_back(line); + i++; + psudb::progress_update((double) i / (double) n, "Reading file:"); + } +} + +int main(int argc, char **argv) { + size_t n = 100000000; + + std::vector counts = {1 , 2, 4, 8}; //, 16, 32, 64}; + // + read_data("benchmarks/data/ursa-genome.txt", n); + + fprintf(stderr, "Finished reading from file.\n"); + + for (auto thread_count : counts) { + + auto extension = new Ext(1000, 12000, 8); + + size_t per_thread = n / thread_count; + + std::thread threads[thread_count]; + + TIMER_INIT(); + TIMER_START(); + for (size_t i=0; iget_record_count(), + thread_count, tput); + + delete extension; + } + + fflush(stderr); +} + diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 62aa0b7..aa3c9f4 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -1,13 +1,11 @@ /* * include/shard/FSTrie.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2024 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * * A shard shim around the FSTrie learned index. - * - * TODO: The code in this file is very poorly commented. */ #pragma once @@ -15,9 +13,7 @@ #include #include "framework/ShardRequirements.h" -#include "FST.hpp" -#include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" +#include "fst.hpp" #include "util/SortedMerge.h" using psudb::CACHELINE_SIZE; @@ -28,14 +24,13 @@ using psudb::byte; namespace de { -template +template class FSTrie { private: + typedef decltype(R::key) K; typedef decltype(R::value) V; - - static_assert(std::is_same_v || std::is_same_v, - "FST requires either string or uint64_t keys"); + static_assert(std::is_same_v, "FST requires std::string keys."); public: FSTrie(BufferView buffer) @@ -50,22 +45,12 @@ public: std::vector keys; keys.reserve(buffer.get_record_count()); - std::vector values; - values.reserve(buffer.get_record_count()); - - size_t longest_key = 0; - /* * Copy the contents of the buffer view into a temporary buffer, and * sort them. We still need to iterate over these temporary records to * apply tombstone/deleted record filtering, as well as any possible * per-record processing that is required by the shard being built. */ - /* - auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - buffer.get_record_count(), - sizeof(Wrapped)); - */ auto temp_buffer = new Wrapped[buffer.get_record_count()](); for (size_t i=0; i>()); for (size_t i=0; i) { - if (m_data[cnt].rec.key.size() > longest_key) { - longest_key = m_data[cnt].rec.key.size(); - } - } - cnt++; } @@ -99,11 +77,8 @@ public: } m_reccnt = cnt; - m_fst = FST(); - if constexpr (std::is_same_v) { - m_fst.load(keys, values, longest_key); - } else { - m_fst.load(keys, values); + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys); } delete[] temp_buffer; @@ -124,10 +99,6 @@ public: std::vector keys; keys.reserve(attemp_reccnt); - std::vector values; - values.reserve(attemp_reccnt); - - size_t longest_key = 0; // FIXME: For smaller cursor arrays, it may be more efficient to skip // the priority queue and just do a scan. PriorityQueue> pq(cursors.size()); @@ -155,16 +126,9 @@ public: } else { auto& cursor = cursors[now.version]; /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { m_data[m_reccnt] = *cursor.ptr; keys.push_back(m_data[m_reccnt].rec.key); - values.push_back(m_data[m_reccnt].rec.value); - - if constexpr (std::is_same_v) { - if (m_data[m_reccnt].rec.key.size() > longest_key) { - longest_key = m_data[m_reccnt].rec.key.size(); - } - } m_reccnt++; } @@ -179,37 +143,24 @@ public: } if (m_reccnt > 0) { - m_fst = FST(); - if constexpr (std::is_same_v) { - m_fst.load(keys, values, longest_key); - } else { - m_fst.load(keys, values); - } + m_fst = new fst::Trie(keys); } } ~FSTrie() { delete[] m_data; + delete m_fst; } Wrapped *point_lookup(const R &rec, bool filter=false) { - size_t idx; - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lookup((uint8_t*)rec.key.c_str(), rec.key.size(), idx); - } else { - res = m_fst.lookup(rec.key, idx); - } - if (res && m_data[idx].rec.key != rec.key) { - fprintf(stderr, "ERROR!\n"); - } + auto idx = m_fst->exactSearch(rec.key); - if (res) { - return m_data + idx; + if (idx == fst::kNotFound) { + return nullptr; } - return nullptr; + return m_data + idx; } Wrapped* get_data() const { @@ -231,53 +182,21 @@ public: size_t get_memory_usage() { - return m_fst.mem() + m_alloc_size; + return m_fst->getMemoryUsage() + m_alloc_size; } size_t get_aux_memory_usage() { return 0; } - size_t get_lower_bound(const K& key) { - auto itr = FSTIter(); - - const K temp_key = key; - - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); - } else { - res = m_fst.lowerBound(temp_key, itr); - } - - return itr.value(); - } - - size_t get_upper_bound(const K& key) { - auto itr = FSTIter(); - - const K temp_key = key; - - bool res; - if constexpr (std::is_same_v) { - res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr); - } else { - res = m_fst.lowerBound(temp_key, itr); - } - - size_t idx = itr.value(); - while (idx < m_reccnt && m_data[idx].rec.key <= key) { - idx++; - } - - return idx; - } + size_t get_lower_bound(R &rec) {return 0;} + size_t get_upper_bound(R &rec) {return 0;} private: Wrapped* m_data; size_t m_reccnt; size_t m_alloc_size; - FST m_fst; + fst::Trie *m_fst; }; } diff --git a/tests/include/shard_string.h b/tests/include/shard_string.h index 27ee782..fa51630 100644 --- a/tests/include/shard_string.h +++ b/tests/include/shard_string.h @@ -58,7 +58,7 @@ START_TEST(t_mbuffer_init) START_TEST(t_shard_init) { - size_t n = 512; + size_t n = 2048; auto mbuffer1 = create_test_mbuffer(n); auto mbuffer2 = create_test_mbuffer(n); auto mbuffer3 = create_test_mbuffer(n); @@ -117,16 +117,14 @@ START_TEST(t_point_lookup) auto view = buffer->get_buffer_view(); for (size_t i=0; irec.key; - r.value = rec->rec.value; + R r = rec->rec; auto result = shard.point_lookup(r); ck_assert_ptr_nonnull(result); ck_assert_str_eq(result->rec.key.c_str(), r.key.c_str()); ck_assert_int_eq(result->rec.value, r.value); - fprintf(stderr, "%ld\n", i); + //fprintf(stderr, "%ld\n", i); } } diff --git a/tests/include/testing.h b/tests/include/testing.h index a3c54c0..2315daa 100644 --- a/tests/include/testing.h +++ b/tests/include/testing.h @@ -51,7 +51,7 @@ static std::vector read_string_data(std::string fname, size_t n) { r.value = atol(field.c_str()); std::getline(ls, field, '\n'); r.key = std::string(field); - + vec.push_back(r); } @@ -116,8 +116,8 @@ static de::MutableBuffer *create_test_mbuffer(size_t cnt) auto buffer = new de::MutableBuffer(cnt/2, cnt); R rec; - if constexpr (de::KVPInterface) { - if constexpr (std::is_same_v) { + if constexpr (de::KVPInterface){ + if constexpr (std::is_same_v) { auto records = read_string_data(kjv_wordlist, cnt); for (size_t i=0; i) { -- cgit v1.2.3