/* * benchmarks/include/bench.h * * Copyright (C) 2023 Douglas Rumbaugh * * All rights reserved. Published under the Modified BSD License. * */ #pragma once #include #include #include #include "framework/DynamicExtension.h" #include "framework/interface/Query.h" #include "framework/reconstruction/FixedShardCountPolicy.h" #include "framework/reconstruction/ReconstructionPolicy.h" #include "query/irs.h" #include "psu-util/progress.h" #include "benchmark_types.h" #include "psu-util/bentley-saxe.h" #include "shard/ISAMTree.h" #include "framework/reconstruction/LevelingPolicy.h" #include "framework/reconstruction/TieringPolicy.h" #include "framework/reconstruction/BSMPolicy.h" #include "framework/reconstruction/FloodL0Policy.h" #include "framework/reconstruction/BackgroundTieringPolicy.h" constexpr double delete_proportion = 0.05; static size_t g_deleted_records = 0; static size_t total = 0; template Q> std::unique_ptr> get_policy(size_t scale_factor, size_t buffer_size, int policy=0, size_t reccnt=0, size_t modifier=0) { de::ReconstructionPolicy *recon = nullptr; if (policy == 0) { recon = new de::TieringPolicy(scale_factor, buffer_size, modifier); } else if (policy == 1) { recon = new de::LevelingPolicy(scale_factor, buffer_size, modifier); } else if (policy == 2) { recon = new de::BSMPolicy(buffer_size, modifier); } else if (policy == 3) { recon = new de::FloodL0Policy(buffer_size); } else if (policy == 4) { assert(reccnt > 0); recon = new de::FixedShardCountPolicy(buffer_size, scale_factor, reccnt); } else if (policy == 5) { recon = new de::BackgroundTieringPolicy(scale_factor, buffer_size, modifier); } return std::unique_ptr>(recon); } template static void run_queries(DE *extension, std::vector &queries) { for (size_t i=0; i) { std::vector result; auto res = extension->get_nearest_by_limit(queries[i].point, queries[i].k); auto itr = res.begin(); while (itr != res.end()) { result.emplace_back(itr->data); itr++; } #ifdef BENCH_PRINT_RESULTS fprintf(stdout, "\n\n"); for (auto &r : result) { fprintf(stdout, "%ld %lf %lf %lf %lf %lf %lf\n", result.size(), r.data[0], r.data[1], r.data[2], r.data[3], r.data[4], r.data[5]); } #endif } else if constexpr (std::is_same_v) { std::vector result; auto res = extension->get_nearest_by_limit(queries[i].point, queries[i].k); auto itr = res.begin(); while (itr != res.end()) { result.emplace_back(itr->data); itr++; } }else if constexpr (std::is_same_v) { auto ptr = extension->find(queries[i].lower_bound); while (ptr != extension->end() && ptr->first <= queries[i].upper_bound) { ++ptr; } } else { auto q = queries[i]; auto res = extension->query(std::move(q)); if constexpr (!BSM) { [[maybe_unused]] auto result = res.get(); #ifdef BENCH_PRINT_RESULTS fprintf(stdout, "\n\n"); for (int i=result.size()-1; i>=0; i--) { auto &r = result[i]; fprintf(stdout, "%ld %lf %lf %lf %lf %lf %lf\n", result.size(), r.data[0], r.data[1], r.data[2], r.data[3], r.data[4], r.data[5]); } fflush(stdout); #endif } else { total = res.size(); #ifdef BENCH_PRINT_RESULTS fprintf(stdout, "\n\n"); for (int i=res.size()-1; i>=0; i--) { auto &r = res[i]; fprintf(stdout, "%ld %lf %lf %lf %lf %lf %lf\n", res.size(), r.data[0], r.data[1], r.data[2], r.data[3], r.data[4], r.data[5]); } fflush(stdout); #endif } } } } template static void run_btree_queries(BenchBTree *btree, std::vector &queries) { std::vector sample_set; sample_set.reserve(queries[0].sample_size); for (size_t i=0; irange_sample(queries[i].lower_bound, queries[i].upper_bound, queries[i].sample_size, sample_set, queries[i].rng); } } template static void run_static_queries(S *shard, std::vector &queries) { for (size_t i=0; i shards = {shard}; std::vector local_queries = {Q::local_preproc(shard, q)}; Q::distribute_query(q, local_queries, nullptr); [[maybe_unused]] auto res = Q::local_query(shard, local_queries[0]); #ifdef BENCH_PRINT_RESULTS fprintf(stdout, "\n\n"); for (int i=res.size()-1; i>=0; i--) { auto &r = res[i].rec; fprintf(stdout, "%ld %lf %lf %lf %lf %lf %lf\n", res.size(), r.data[0], r.data[1], r.data[2], r.data[3], r.data[4], r.data[5]); } fflush(stdout); #endif } } template static void insert_records(DE *structure, size_t start, size_t stop, std::vector &records, std::vector &to_delete, size_t &delete_idx, bool delete_records, gsl_rng *rng) { psudb::progress_update(0, "Insert Progress"); for (size_t i=start; i) { structure->insert(records[i]); } else if constexpr (std::is_same_v || std::is_same_v) { structure->add(records[i]); } else if constexpr (std::is_same_v) { structure->insert_or_assign(records[i].key, records[i].value); } else { while (!structure->insert(records[i])) { psudb::progress_update((double) i / (double)(stop - start), "Insert Progress"); usleep(1); } } if (delete_records && gsl_rng_uniform(rng) <= delete_proportion && to_delete[delete_idx] <= i) { if constexpr (std::is_same_v) { structure->erase_one(records[to_delete[delete_idx]].key); } else if constexpr (std::is_same_v || std::is_same_v) { structure->remove(records[to_delete[delete_idx]]); } else if constexpr (std::is_same_v) { structure->erase(records[to_delete[delete_idx]].key); } else { while (!structure->erase(records[to_delete[delete_idx]])) { usleep(1); } } delete_idx++; g_deleted_records++; } } psudb::progress_update(1, "Insert Progress"); } template static bool insert_tput_bench(DE &de_index, std::fstream &file, size_t insert_cnt, double delete_prop, gsl_rng *rng, std::vector &to_delete, bool binary=false) { size_t delete_cnt = insert_cnt * delete_prop; size_t applied_deletes = 0; size_t applied_inserts = 0; std::vector insert_vec; std::vector delete_vec; insert_vec.reserve(BATCH); delete_vec.reserve(BATCH*delete_prop); size_t delete_idx = 0; bool continue_benchmark = true; size_t total_time = 0; while (applied_inserts < insert_cnt && continue_benchmark) { continue_benchmark = build_insert_vec(file, insert_vec, BATCH, delete_prop, to_delete, binary); if (applied_deletes < delete_cnt) { build_delete_vec(to_delete, delete_vec, BATCH*delete_prop); delete_idx = 0; } if (insert_vec.size() == 0) { break; } if constexpr (PROGRESS) { psudb::progress_update((double) applied_inserts / (double) insert_cnt, "inserting:"); } auto insert_start = std::chrono::high_resolution_clock::now(); for (size_t i=0; i) { de_index.erase_one(delete_vec[delete_idx++].key); #ifdef _GNU_SOURCE } else if constexpr (std::is_same_v || std::is_same_v) { de_index.remove(delete_vec[delete_idx++]); #endif } else { de_index.erase(delete_vec[delete_idx++]); } applied_deletes++; } // insert the record; #ifdef _GNU_SOURCE if constexpr (std::is_same_v || std::is_same_v) { de_index.add(insert_vec[i]); } else { de_index.insert(insert_vec[i]); } #else de_index.insert(insert_vec[i]); #endif applied_inserts++; } auto insert_stop = std::chrono::high_resolution_clock::now(); total_time += std::chrono::duration_cast(insert_stop - insert_start).count(); } if constexpr (PROGRESS) { psudb::progress_update(1.0, "inserting:"); } size_t throughput = (((double) (applied_inserts + applied_deletes) / (double) total_time) * 1e9); fprintf(stdout, "%ld\t", throughput); return continue_benchmark; } template static bool query_latency_bench(DE &de_index, std::vector queries, size_t trial_cnt=1) { char progbuf[25]; if constexpr (PROGRESS) { sprintf(progbuf, "querying:"); } size_t total_time = 0; size_t total_results = 0; for (size_t i=0; i(stop - start).count(); } psudb::progress_update(1.0, progbuf); size_t query_latency = total_time / (trial_cnt * queries.size()); fprintf(stdout, "%ld\t", query_latency); fflush(stdout); return true; } template static bool static_latency_bench(Shard *shard, std::vector queries, size_t trial_cnt=100) { char progbuf[25]; if constexpr (PROGRESS) { sprintf(progbuf, "querying:"); } size_t total_time = 0; size_t total_results = 0; for (size_t i=0; i local_queries(1); auto start = std::chrono::high_resolution_clock::now(); for (size_t j=0; j(stop - start).count(); } psudb::progress_update(1.0, progbuf); size_t query_latency = total_time / (trial_cnt * queries.size()); fprintf(stdout, "%ld\t", query_latency); fflush(stdout); return true; }