diff options
| -rw-r--r-- | CMakeLists.txt | 10 | ||||
| -rw-r--r-- | benchmarks/include/bench.h | 6 | ||||
| -rw-r--r-- | benchmarks/include/bench_utility.h | 42 | ||||
| -rw-r--r-- | benchmarks/isam_irs_bench.cpp | 64 | ||||
| -rw-r--r-- | benchmarks/isam_rq_bench.cpp | 59 | ||||
| -rw-r--r-- | benchmarks/pgm_rq_bench.cpp | 6 | ||||
| -rw-r--r-- | include/shard/MemISAM.h | 126 |
7 files changed, 296 insertions, 17 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index a7b2f8a..60f5c9b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -105,4 +105,14 @@ if (bench) target_link_libraries(pgm_rq_bench PUBLIC gsl pthread gomp) target_include_directories(pgm_rq_bench PRIVATE include external/PGM-index/include external/PLEX/include bench/include) target_compile_options(pgm_rq_bench PUBLIC -fopenmp) + + add_executable(isam_irs_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/isam_irs_bench.cpp) + target_link_libraries(isam_irs_bench PUBLIC gsl pthread gomp) + target_include_directories(isam_irs_bench PRIVATE include external/PGM-index/include external/PLEX/include bench/include) + target_compile_options(isam_irs_bench PUBLIC -fopenmp) + + add_executable(isam_rq_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/isam_rq_bench.cpp) + target_link_libraries(isam_rq_bench PUBLIC gsl pthread gomp) + target_include_directories(isam_rq_bench PRIVATE include external/PGM-index/include external/PLEX/include bench/include) + target_compile_options(isam_rq_bench PUBLIC -fopenmp) endif() diff --git a/benchmarks/include/bench.h b/benchmarks/include/bench.h index 3e1c6b2..e0f4c1d 100644 --- a/benchmarks/include/bench.h +++ b/benchmarks/include/bench.h @@ -12,7 +12,7 @@ template <typename DE, de::RecordInterface R, bool PROGRESS=true, size_t BATCH=1000> static bool insert_tput_bench(DE &de_index, std::fstream &file, size_t insert_cnt, - double delete_prop, std::vector<R> &to_delete) { + double delete_prop, std::vector<R> &to_delete, bool binary=false) { size_t delete_cnt = insert_cnt * delete_prop; @@ -31,7 +31,7 @@ static bool insert_tput_bench(DE &de_index, std::fstream &file, size_t insert_cn size_t total_time = 0; while (applied_inserts < insert_cnt && continue_benchmark) { - continue_benchmark = build_insert_vec(file, insert_vec, BATCH, delete_prop, to_delete); + continue_benchmark = build_insert_vec(file, insert_vec, BATCH, delete_prop, to_delete, binary); if (applied_deletes < delete_cnt) { build_delete_vec(to_delete, delete_vec, BATCH*delete_prop); delete_idx = 0; @@ -104,7 +104,6 @@ static bool query_latency_bench(DE &de_index, std::vector<QP> queries, size_t tr size_t query_latency = total_time / (trial_cnt * queries.size()); fprintf(stdout, "%ld\t", query_latency); - fprintf(stderr, "%ld\n", total_results); fflush(stdout); return true; @@ -146,7 +145,6 @@ static bool static_latency_bench(Shard *shard, std::vector<QP> queries, size_t t size_t query_latency = total_time / (trial_cnt * queries.size()); fprintf(stdout, "%ld\t", query_latency); - fprintf(stderr, "%ld\n", total_results); fflush(stdout); return true; diff --git a/benchmarks/include/bench_utility.h b/benchmarks/include/bench_utility.h index 2d31cae..a5f5e0b 100644 --- a/benchmarks/include/bench_utility.h +++ b/benchmarks/include/bench_utility.h @@ -30,7 +30,7 @@ #include <random> typedef uint64_t key_type; -typedef uint32_t value_type; +typedef uint64_t value_type; typedef uint64_t weight_type; typedef de::WeightedRecord<key_type, value_type, weight_type> WRec; @@ -39,6 +39,8 @@ typedef de::Record<key_type, value_type> Rec; typedef de::DynamicExtension<WRec, de::WSS<WRec>, de::WSSQuery<WRec>> ExtendedWSS; typedef de::DynamicExtension<Rec, de::TrieSpline<Rec>, de::TrieSplineRangeQuery<Rec>> ExtendedTSRQ; typedef de::DynamicExtension<Rec, de::PGM<Rec>, de::PGMRangeQuery<Rec>> ExtendedPGMRQ; +typedef de::DynamicExtension<Rec, de::MemISAM<Rec>, de::IRSQuery<Rec>> ExtendedISAM_IRS; +typedef de::DynamicExtension<Rec, de::MemISAM<Rec>, de::ISAMRangeQuery<Rec>> ExtendedISAM_RQ; static gsl_rng *g_rng; static std::set<WRec> *g_to_delete; @@ -115,11 +117,36 @@ static std::vector<QP> read_range_queries(std::string fname, double selectivity) return queries; } -template <de::RecordInterface R> -static bool next_record(std::fstream &file, R &record) +template <de::KVPInterface R> +static bool next_record(std::fstream &file, R &record, bool binary=false) { + static value_type value = 1; if (g_reccnt >= g_max_record_cnt) return false; + if (binary) { + if (file.good()) { + decltype(R::key) key; + + file.read((char*) &key, sizeof(key)); + record.key = key; + record.value = value; + value++; + + if constexpr (de::WeightedRecordInterface<R>) { + decltype(R::weight) weight; + file.read((char*) &weight, sizeof(weight)); + record.weight = weight; + } + + if (record.key < g_min_key) g_min_key = record.key; + if (record.key > g_max_key) g_max_key = record.key; + + return true; + } + + return false; + } + std::string line; if (std::getline(file, line, '\n')) { std::stringstream line_stream(line); @@ -139,7 +166,6 @@ static bool next_record(std::fstream &file, R &record) } if (record.key < g_min_key) g_min_key = record.key; - if (record.key > g_max_key) g_max_key = record.key; g_reccnt++; @@ -152,11 +178,11 @@ static bool next_record(std::fstream &file, R &record) template <de::RecordInterface R> static bool build_insert_vec(std::fstream &file, std::vector<R> &vec, size_t n, - double delete_prop, std::vector<R> &to_delete) { + double delete_prop, std::vector<R> &to_delete, bool binary=false) { vec.clear(); for (size_t i=0; i<n; i++) { R rec; - if (!next_record(file, rec)) { + if (!next_record(file, rec, binary)) { if (i == 0) { return false; } @@ -210,7 +236,7 @@ static void progress_update(double percentage, std::string prompt) { template <typename DE, de::RecordInterface R> static bool warmup(std::fstream &file, DE &extended_index, size_t count, - double delete_prop, std::vector<R> to_delete, bool progress=true) { + double delete_prop, std::vector<R> to_delete, bool progress=true, bool binary=false) { size_t batch = std::min(.1 * count, 25000.0); std::vector<R> insert_vec; @@ -224,7 +250,7 @@ static bool warmup(std::fstream &file, DE &extended_index, size_t count, double last_percent = 0; while (inserted < count) { // Build vector of records to insert and potentially delete - auto continue_warmup = build_insert_vec(file, insert_vec, batch, delete_prop, to_delete); + auto continue_warmup = build_insert_vec(file, insert_vec, batch, delete_prop, to_delete, binary); if (inserted > batch) { build_delete_vec(to_delete, delete_vec, batch*delete_prop); delete_idx = 0; diff --git a/benchmarks/isam_irs_bench.cpp b/benchmarks/isam_irs_bench.cpp new file mode 100644 index 0000000..96525f0 --- /dev/null +++ b/benchmarks/isam_irs_bench.cpp @@ -0,0 +1,64 @@ +#include "include/bench.h" + +int main(int argc, char **argv) +{ + if (argc < 5) { + fprintf(stderr, "Usage: isam_irs_bench <filename> <record_count> <delete_proportion> <query_file>\n"); + exit(EXIT_FAILURE); + } + + std::string filename = std::string(argv[1]); + size_t record_count = atol(argv[2]); + double delete_prop = atof(argv[3]); + std::string qfilename = std::string(argv[4]); + + size_t buffer_cap = 12000; + size_t scale_factor = 6; + double max_delete_prop = delete_prop; + bool use_osm = false; + + double insert_batch = 0.1; + + init_bench_env(record_count, true, use_osm); + auto queries = read_range_queries<de::irs_query_parms<Rec>>(qfilename, .001); + + for (auto &q: queries) { + q.rng = g_rng; + q.sample_size = 1000; + } + + auto de_irs = ExtendedISAM_IRS(buffer_cap, scale_factor, max_delete_prop); + + std::fstream datafile; + datafile.open(filename, std::ios::in | std::ios::binary); + + std::vector<Rec> to_delete; + + // warm up the tree with initial_insertions number of initially inserted + // records + size_t warmup_cnt = insert_batch * record_count; + warmup<ExtendedISAM_IRS, Rec>(datafile, de_irs, warmup_cnt, delete_prop, to_delete, true, true); + + size_t insert_cnt = record_count - warmup_cnt; + + insert_tput_bench<ExtendedISAM_IRS, Rec>(de_irs, datafile, insert_cnt, delete_prop, to_delete, true); + fprintf(stdout, "%ld\t", de_irs.get_memory_usage()); + query_latency_bench<ExtendedISAM_IRS, Rec, de::irs_query_parms<Rec>>(de_irs, queries); + fprintf(stdout, "\n"); + + auto ts = de_irs.create_static_structure(); + + fprintf(stdout, "%ld\t", ts->get_memory_usage()); + static_latency_bench<de::MemISAM<Rec>, Rec, de::irs_query_parms<Rec>, de::IRSQuery<Rec>>( + ts, queries, 1 + ); + fprintf(stdout, "\n"); + + delete ts; + + delete_bench_env(); + fflush(stdout); + fflush(stderr); + + exit(EXIT_SUCCESS); +} diff --git a/benchmarks/isam_rq_bench.cpp b/benchmarks/isam_rq_bench.cpp new file mode 100644 index 0000000..bb5626e --- /dev/null +++ b/benchmarks/isam_rq_bench.cpp @@ -0,0 +1,59 @@ +#include "include/bench.h" + +int main(int argc, char **argv) +{ + if (argc < 5) { + fprintf(stderr, "Usage: isam_rq_bench <filename> <record_count> <delete_proportion> <query_file>\n"); + exit(EXIT_FAILURE); + } + + std::string filename = std::string(argv[1]); + size_t record_count = atol(argv[2]); + double delete_prop = atof(argv[3]); + std::string qfilename = std::string(argv[4]); + + size_t buffer_cap = 12000; + size_t scale_factor = 6; + double max_delete_prop = delete_prop; + bool use_osm = false; + + double insert_batch = 0.1; + + init_bench_env(record_count, true, use_osm); + auto queries = read_range_queries<de::ISAMRangeQueryParms<Rec>>(qfilename, .0001); + + auto de_isam_rq = ExtendedISAM_RQ(buffer_cap, scale_factor, max_delete_prop); + + std::fstream datafile; + datafile.open(filename, std::ios::in | std::ios::binary); + + std::vector<Rec> to_delete; + + // warm up the tree with initial_insertions number of initially inserted + // records + size_t warmup_cnt = insert_batch * record_count; + warmup<ExtendedISAM_RQ, Rec>(datafile, de_isam_rq, warmup_cnt, delete_prop, to_delete, true, true); + + size_t insert_cnt = record_count - warmup_cnt; + + insert_tput_bench<ExtendedISAM_RQ, Rec>(de_isam_rq, datafile, insert_cnt, delete_prop, to_delete, true); + fprintf(stdout, "%ld\t", de_isam_rq.get_memory_usage()); + query_latency_bench<ExtendedISAM_RQ, Rec, de::ISAMRangeQueryParms<Rec>>(de_isam_rq, queries); + fprintf(stdout, "\n"); + + auto ts = de_isam_rq.create_static_structure(); + + fprintf(stdout, "%ld\t", ts->get_memory_usage()); + static_latency_bench<de::MemISAM<Rec>, Rec, de::ISAMRangeQueryParms<Rec>, de::ISAMRangeQuery<Rec>>( + ts, queries, 1 + ); + fprintf(stdout, "\n"); + + delete ts; + + delete_bench_env(); + fflush(stdout); + fflush(stderr); + + exit(EXIT_SUCCESS); +} diff --git a/benchmarks/pgm_rq_bench.cpp b/benchmarks/pgm_rq_bench.cpp index aac6e07..3acc34e 100644 --- a/benchmarks/pgm_rq_bench.cpp +++ b/benchmarks/pgm_rq_bench.cpp @@ -32,18 +32,18 @@ int main(int argc, char **argv) auto queries = read_range_queries<de::pgm_range_query_parms<Rec>>(query_file, .0001); std::fstream datafile; - datafile.open(filename, std::ios::in); + datafile.open(filename, std::ios::in | std::ios::binary); std::vector<Rec> to_delete; // warm up the tree with initial_insertions number of initially inserted // records size_t warmup_cnt = insert_batch * record_count; - warmup<ExtendedPGMRQ, Rec>(datafile, de, warmup_cnt, delete_prop, to_delete); + warmup<ExtendedPGMRQ, Rec>(datafile, de, warmup_cnt, delete_prop, to_delete, true, true); size_t insert_cnt = record_count - warmup_cnt; - insert_tput_bench<ExtendedPGMRQ, Rec>(de, datafile, insert_cnt, delete_prop, to_delete); + insert_tput_bench<ExtendedPGMRQ, Rec>(de, datafile, insert_cnt, delete_prop, to_delete, true); fprintf(stdout, "%ld\t", de.get_memory_usage()); query_latency_bench<ExtendedPGMRQ, Rec, de::pgm_range_query_parms<Rec>>(de, queries, 1); diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index aa31962..897193c 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -40,6 +40,7 @@ struct IRSState { size_t lower_bound; size_t upper_bound; size_t sample_size; + size_t total_weight; }; template <RecordInterface R> @@ -49,12 +50,32 @@ struct IRSBufferState { size_t sample_size; }; +template <RecordInterface R> +struct ISAMRangeQueryParms { + decltype(R::key) lower_bound; + decltype(R::key) upper_bound; +}; + +template <RecordInterface R> +class ISAMRangeQuery; + +template <RecordInterface R> +struct ISAMRangeQueryState { + size_t start_idx; + size_t stop_idx; +}; + +template <RecordInterface R> +struct RangeQueryBufferState { + size_t cutoff; +}; template <RecordInterface R> class MemISAM { private: friend class IRSQuery<R, true>; friend class IRSQuery<R, false>; + friend class ISAMRangeQuery<R>; typedef decltype(R::key) K; typedef decltype(R::value) V; @@ -233,7 +254,7 @@ public: } size_t get_memory_usage() { - return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size; + return m_internal_node_cnt * inmem_isam_node_size; } private: @@ -404,7 +425,7 @@ public: weights.push_back(bs->records.size()); } - decltype(R::weight) total_weight = 0; + size_t total_weight = 0; for (auto &s : shard_states) { auto state = (IRSState<R> *) s; total_weight += state->upper_bound - state->lower_bound; @@ -509,4 +530,105 @@ public: } }; + +template <RecordInterface R> +class ISAMRangeQuery { +public: + static void *get_query_state(MemISAM<R> *ts, void *parms) { + auto res = new ISAMRangeQueryState<R>(); + auto p = (ISAMRangeQueryParms<R> *) parms; + + res->start_idx = ts->get_lower_bound(p->lower_bound); + res->stop_idx = ts->get_record_count(); + + return res; + } + + static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) { + auto res = new RangeQueryBufferState<R>(); + res->cutoff = buffer->get_record_count(); + + return res; + } + + static void process_query_states(void *query_parms, std::vector<void*> shard_states, void *buff_state) { + return; + } + + static std::vector<Wrapped<R>> query(MemISAM<R> *ts, void *q_state, void *parms) { + std::vector<Wrapped<R>> records; + auto p = (ISAMRangeQueryParms<R> *) parms; + auto s = (ISAMRangeQueryState<R> *) q_state; + + // if the returned index is one past the end of the + // records for the PGM, then there are not records + // in the index falling into the specified range. + if (s->start_idx == ts->get_record_count()) { + return records; + } + + auto ptr = ts->get_record_at(s->start_idx); + + // roll the pointer forward to the first record that is + // greater than or equal to the lower bound. + while(ptr->rec.key < p->lower_bound) { + ptr++; + } + + while (ptr->rec.key <= p->upper_bound && ptr < ts->m_data + s->stop_idx) { + records.emplace_back(*ptr); + ptr++; + } + + return records; + } + + static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) { + auto p = (ISAMRangeQueryParms<R> *) parms; + auto s = (RangeQueryBufferState<R> *) state; + + std::vector<Wrapped<R>> records; + for (size_t i=0; i<s->cutoff; i++) { + auto rec = buffer->get_data() + i; + if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { + records.emplace_back(*rec); + } + } + + return records; + } + + static std::vector<R> merge(std::vector<std::vector<R>> &results, void *parms) { + size_t total = 0; + for (size_t i=0; i<results.size(); i++) { + total += results[i].size(); + } + + if (total == 0) { + return std::vector<R>(); + } + + std::vector<R> output; + output.reserve(total); + + for (size_t i=0; i<results.size(); i++) { + std::move(results[i].begin(), results[i].end(), std::back_inserter(output)); + } + + return output; + } + + static void delete_query_state(void *state) { + auto s = (ISAMRangeQueryState<R> *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (RangeQueryBufferState<R> *) state; + delete s; + } +}; + + + } |