summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-03-22 14:04:40 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2024-03-22 14:04:40 -0400
commit147f0df58e1ff4973bffb7e4628e6b2fdc20eb57 (patch)
treefac1c176f59d0c0428cce2b3b514c2708829985d
parent10c2348664a0341764b6a773aaa58f2af93075ad (diff)
downloaddynamic-extension-147f0df58e1ff4973bffb7e4628e6b2fdc20eb57.tar.gz
FSTrie testing and debugging
-rw-r--r--benchmarks/string_insertion_tput.cpp92
-rw-r--r--include/shard/FSTrie.h121
-rw-r--r--tests/include/shard_string.h8
-rw-r--r--tests/include/testing.h6
4 files changed, 118 insertions, 109 deletions
diff --git a/benchmarks/string_insertion_tput.cpp b/benchmarks/string_insertion_tput.cpp
new file mode 100644
index 0000000..d205175
--- /dev/null
+++ b/benchmarks/string_insertion_tput.cpp
@@ -0,0 +1,92 @@
+/*
+ *
+ */
+
+#define ENABLE_TIMER
+
+#include <fstream>
+#include <sstream>
+
+#include "framework/DynamicExtension.h"
+#include "shard/FSTrie.h"
+#include "query/rangequery.h"
+#include "framework/interface/Record.h"
+
+#include "psu-util/timer.h"
+#include "psu-util/progress.h"
+
+
+typedef de::Record<std::string, uint64_t> Rec;
+typedef de::FSTrie<Rec> Trie;
+typedef de::rq::Query<Rec, Trie> Q;
+typedef de::DynamicExtension<Rec, Trie, Q> Ext; //, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext;
+
+std::vector<std::string> strings;
+
+void insert_thread(int64_t start, int64_t end, Ext *extension) {
+ for (uint64_t i=start; i<end; i++) {
+ Rec r = {strings[i], i};
+ while (!extension->insert(r)) {
+ _mm_pause();
+ }
+ }
+}
+
+void read_data(std::string fname, size_t n=10000000) {
+ strings.reserve(n);
+
+ std::fstream file;
+ file.open(fname, std::ios::in);
+
+ size_t i=0;
+ std::string line;
+ while (i < n && std::getline(file, line, '\n')) {
+ strings.emplace_back(line);
+ i++;
+ psudb::progress_update((double) i / (double) n, "Reading file:");
+ }
+}
+
+int main(int argc, char **argv) {
+ size_t n = 100000000;
+
+ std::vector<int> counts = {1 , 2, 4, 8}; //, 16, 32, 64};
+ //
+ read_data("benchmarks/data/ursa-genome.txt", n);
+
+ fprintf(stderr, "Finished reading from file.\n");
+
+ for (auto thread_count : counts) {
+
+ auto extension = new Ext(1000, 12000, 8);
+
+ size_t per_thread = n / thread_count;
+
+ std::thread threads[thread_count];
+
+ TIMER_INIT();
+ TIMER_START();
+ for (size_t i=0; i<thread_count; i++) {
+ threads[i] = std::thread(insert_thread, i*per_thread,
+ i*per_thread+per_thread, extension);
+ }
+
+ for (size_t i=0; i<thread_count; i++) {
+ threads[i].join();
+ }
+
+ TIMER_STOP();
+
+ auto total_time = TIMER_RESULT();
+
+ double tput = (double) n / (double) total_time * 1e9;
+
+ fprintf(stdout, "%ld\t%d\t%lf\n", extension->get_record_count(),
+ thread_count, tput);
+
+ delete extension;
+ }
+
+ fflush(stderr);
+}
+
diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h
index 62aa0b7..aa3c9f4 100644
--- a/include/shard/FSTrie.h
+++ b/include/shard/FSTrie.h
@@ -1,13 +1,11 @@
/*
* include/shard/FSTrie.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
* A shard shim around the FSTrie learned index.
- *
- * TODO: The code in this file is very poorly commented.
*/
#pragma once
@@ -15,9 +13,7 @@
#include <vector>
#include "framework/ShardRequirements.h"
-#include "FST.hpp"
-#include "psu-ds/BloomFilter.h"
-#include "util/bf_config.h"
+#include "fst.hpp"
#include "util/SortedMerge.h"
using psudb::CACHELINE_SIZE;
@@ -28,14 +24,13 @@ using psudb::byte;
namespace de {
-template <KVPInterface R, size_t E=1024>
+template <KVPInterface R>
class FSTrie {
private:
+
typedef decltype(R::key) K;
typedef decltype(R::value) V;
-
- static_assert(std::is_same_v<K, std::string> || std::is_same_v<K, uint64_t>,
- "FST requires either string or uint64_t keys");
+ static_assert(std::is_same_v<K, std::string>, "FST requires std::string keys.");
public:
FSTrie(BufferView<R> buffer)
@@ -50,22 +45,12 @@ public:
std::vector<K> keys;
keys.reserve(buffer.get_record_count());
- std::vector<size_t> values;
- values.reserve(buffer.get_record_count());
-
- size_t longest_key = 0;
-
/*
* Copy the contents of the buffer view into a temporary buffer, and
* sort them. We still need to iterate over these temporary records to
* apply tombstone/deleted record filtering, as well as any possible
* per-record processing that is required by the shard being built.
*/
- /*
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
- buffer.get_record_count(),
- sizeof(Wrapped<R>));
- */
auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
for (size_t i=0; i<buffer.get_record_count(); i++) {
temp_buffer[i] = *(buffer.get(i));
@@ -76,21 +61,14 @@ public:
std::sort(base, stop, std::less<Wrapped<R>>());
for (size_t i=0; i<buffer.get_record_count(); i++) {
- if (temp_buffer[i].is_deleted()) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible()) {
continue;
}
m_data[cnt] = temp_buffer[i];
- m_data[cnt].header = 0;
+ m_data[cnt].clear_timestamp();
keys.push_back(m_data[cnt].rec.key);
- values.push_back(cnt);
- if constexpr (std::is_same_v<K, std::string>) {
- if (m_data[cnt].rec.key.size() > longest_key) {
- longest_key = m_data[cnt].rec.key.size();
- }
- }
-
cnt++;
}
@@ -99,11 +77,8 @@ public:
}
m_reccnt = cnt;
- m_fst = FST();
- if constexpr (std::is_same_v<K, std::string>) {
- m_fst.load(keys, values, longest_key);
- } else {
- m_fst.load(keys, values);
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys);
}
delete[] temp_buffer;
@@ -124,10 +99,6 @@ public:
std::vector<K> keys;
keys.reserve(attemp_reccnt);
- std::vector<size_t> values;
- values.reserve(attemp_reccnt);
-
- size_t longest_key = 0;
// FIXME: For smaller cursor arrays, it may be more efficient to skip
// the priority queue and just do a scan.
PriorityQueue<Wrapped<R>> pq(cursors.size());
@@ -155,16 +126,9 @@ public:
} else {
auto& cursor = cursors[now.version];
/* skip over records that have been deleted via tagging */
- if (!cursor.ptr->is_deleted()) {
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
m_data[m_reccnt] = *cursor.ptr;
keys.push_back(m_data[m_reccnt].rec.key);
- values.push_back(m_data[m_reccnt].rec.value);
-
- if constexpr (std::is_same_v<K, std::string>) {
- if (m_data[m_reccnt].rec.key.size() > longest_key) {
- longest_key = m_data[m_reccnt].rec.key.size();
- }
- }
m_reccnt++;
}
@@ -179,37 +143,24 @@ public:
}
if (m_reccnt > 0) {
- m_fst = FST();
- if constexpr (std::is_same_v<K, std::string>) {
- m_fst.load(keys, values, longest_key);
- } else {
- m_fst.load(keys, values);
- }
+ m_fst = new fst::Trie(keys);
}
}
~FSTrie() {
delete[] m_data;
+ delete m_fst;
}
Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- size_t idx;
- bool res;
- if constexpr (std::is_same_v<K, std::string>) {
- res = m_fst.lookup((uint8_t*)rec.key.c_str(), rec.key.size(), idx);
- } else {
- res = m_fst.lookup(rec.key, idx);
- }
- if (res && m_data[idx].rec.key != rec.key) {
- fprintf(stderr, "ERROR!\n");
- }
+ auto idx = m_fst->exactSearch(rec.key);
- if (res) {
- return m_data + idx;
+ if (idx == fst::kNotFound) {
+ return nullptr;
}
- return nullptr;
+ return m_data + idx;
}
Wrapped<R>* get_data() const {
@@ -231,53 +182,21 @@ public:
size_t get_memory_usage() {
- return m_fst.mem() + m_alloc_size;
+ return m_fst->getMemoryUsage() + m_alloc_size;
}
size_t get_aux_memory_usage() {
return 0;
}
- size_t get_lower_bound(const K& key) {
- auto itr = FSTIter();
-
- const K temp_key = key;
-
- bool res;
- if constexpr (std::is_same_v<K, std::string>) {
- res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr);
- } else {
- res = m_fst.lowerBound(temp_key, itr);
- }
-
- return itr.value();
- }
-
- size_t get_upper_bound(const K& key) {
- auto itr = FSTIter();
-
- const K temp_key = key;
-
- bool res;
- if constexpr (std::is_same_v<K, std::string>) {
- res = m_fst.lowerBound(temp_key.c_str(), key.size(), itr);
- } else {
- res = m_fst.lowerBound(temp_key, itr);
- }
-
- size_t idx = itr.value();
- while (idx < m_reccnt && m_data[idx].rec.key <= key) {
- idx++;
- }
-
- return idx;
- }
+ size_t get_lower_bound(R &rec) {return 0;}
+ size_t get_upper_bound(R &rec) {return 0;}
private:
Wrapped<R>* m_data;
size_t m_reccnt;
size_t m_alloc_size;
- FST m_fst;
+ fst::Trie *m_fst;
};
}
diff --git a/tests/include/shard_string.h b/tests/include/shard_string.h
index 27ee782..fa51630 100644
--- a/tests/include/shard_string.h
+++ b/tests/include/shard_string.h
@@ -58,7 +58,7 @@ START_TEST(t_mbuffer_init)
START_TEST(t_shard_init)
{
- size_t n = 512;
+ size_t n = 2048;
auto mbuffer1 = create_test_mbuffer<R>(n);
auto mbuffer2 = create_test_mbuffer<R>(n);
auto mbuffer3 = create_test_mbuffer<R>(n);
@@ -117,16 +117,14 @@ START_TEST(t_point_lookup)
auto view = buffer->get_buffer_view();
for (size_t i=0; i<n; i++) {
- R r;
auto rec = view.get(i);
- r.key = rec->rec.key;
- r.value = rec->rec.value;
+ R r = rec->rec;
auto result = shard.point_lookup(r);
ck_assert_ptr_nonnull(result);
ck_assert_str_eq(result->rec.key.c_str(), r.key.c_str());
ck_assert_int_eq(result->rec.value, r.value);
- fprintf(stderr, "%ld\n", i);
+ //fprintf(stderr, "%ld\n", i);
}
}
diff --git a/tests/include/testing.h b/tests/include/testing.h
index a3c54c0..2315daa 100644
--- a/tests/include/testing.h
+++ b/tests/include/testing.h
@@ -51,7 +51,7 @@ static std::vector<StringRec> read_string_data(std::string fname, size_t n) {
r.value = atol(field.c_str());
std::getline(ls, field, '\n');
r.key = std::string(field);
-
+
vec.push_back(r);
}
@@ -116,8 +116,8 @@ static de::MutableBuffer<R> *create_test_mbuffer(size_t cnt)
auto buffer = new de::MutableBuffer<R>(cnt/2, cnt);
R rec;
- if constexpr (de::KVPInterface<R>) {
- if constexpr (std::is_same_v<R, StringRec>) {
+ if constexpr (de::KVPInterface<R>){
+ if constexpr (std::is_same_v<decltype(R::key), std::string>) {
auto records = read_string_data(kjv_wordlist, cnt);
for (size_t i=0; i<cnt; i++) {
if constexpr (de::WeightedRecordInterface<R>) {