diff options
| -rw-r--r-- | CMakeLists.txt | 34 | ||||
| -rw-r--r-- | benchmarks/include/standard_benchmarks.h | 6 | ||||
| -rw-r--r-- | benchmarks/vldb/ts_parmsweep.cpp | 2 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 21 | ||||
| -rw-r--r-- | include/framework/interface/Query.h | 184 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 | ||||
| -rw-r--r-- | include/query/irs.h | 41 | ||||
| -rw-r--r-- | include/query/knn.h | 42 | ||||
| -rw-r--r-- | include/query/pointlookup.h | 19 | ||||
| -rw-r--r-- | include/query/rangecount.h | 37 | ||||
| -rw-r--r-- | include/query/rangequery.h | 39 | ||||
| -rw-r--r-- | include/query/wss.h | 28 | ||||
| -rw-r--r-- | tests/include/irs.h | 37 | ||||
| -rw-r--r-- | tests/include/pointlookup.h | 14 | ||||
| -rw-r--r-- | tests/include/rangecount.h | 18 | ||||
| -rw-r--r-- | tests/include/rangequery.h | 40 | ||||
| -rw-r--r-- | tests/include/shard_standard.h | 14 | ||||
| -rw-r--r-- | tests/include/testing.h | 308 | ||||
| -rw-r--r-- | tests/mutable_buffer_tests.cpp | 23 | ||||
| -rw-r--r-- | tests/vptree_tests.cpp | 18 |
20 files changed, 459 insertions, 468 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index b185b0f..0c1aaa1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,12 +14,15 @@ set(tests True) set(bench false) set(vldb_bench true) +# ALEX doesn't build under C++20 +set(build_alex false) + set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin") set(CMAKE_CXX_FLAGS=-latomic -mcx16) -add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal -mcx16 -march=native) # -fconcepts-diagnostics-depth=3) +add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal -mcx16 -march=native -Werror) # -fconcepts-diagnostics-depth=3) find_package(OpenMP REQUIRED) add_compile_options(${OpenMP_CXX_FLAGS}) @@ -130,11 +133,6 @@ if (tests) target_link_options(alias_tests PUBLIC -mcx16) target_include_directories(alias_tests PRIVATE include external/psudb-common/cpp/include) - add_executable(triespline_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_tests.cpp) - target_link_libraries(triespline_tests PUBLIC gsl check subunit pthread atomic) - target_link_options(triespline_tests PUBLIC -mcx16) - target_include_directories(triespline_tests PRIVATE include external/psudb-common/cpp/include external/PLEX/include) - add_executable(pgm_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/pgm_tests.cpp) target_link_libraries(pgm_tests PUBLIC gsl check subunit pthread gomp atomic) target_include_directories(pgm_tests PRIVATE include external/PGM-index/include external/psudb-common/cpp/include) @@ -144,6 +142,11 @@ if (tests) # Triespline code doesn't build under OpenBSD due to ambiguous function call; # this is likely a difference between gcc and clang, rather than an OS thing if (NOT BSD) + add_executable(triespline_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_tests.cpp) + target_link_libraries(triespline_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(triespline_tests PUBLIC -mcx16) + target_include_directories(triespline_tests PRIVATE include external/psudb-common/cpp/include external/PLEX/include) + add_executable(triespline_debug ${CMAKE_CURRENT_SOURCE_DIR}/tests/triespline_debug.cpp) target_link_libraries(triespline_debug PUBLIC gsl check subunit pthread atomic) target_link_options(triespline_debug PUBLIC -mcx16) @@ -227,12 +230,6 @@ if (vldb_bench) target_link_options(ts_bsm_bench PUBLIC -mcx16) target_compile_options(ts_bsm_bench PUBLIC) - #add_executable(ts_mdsp_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/vldb/ts_mdsp_bench.cpp) - #target_link_libraries(ts_mdsp_bench PUBLIC gsl pthread atomic) - #target_include_directories(ts_mdsp_bench PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) - #target_link_options(ts_mdsp_bench PUBLIC -mcx16) - #target_compile_options(ts_mdsp_bench PUBLIC) - add_executable(pgm_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/vldb/pgm_bench.cpp) target_link_libraries(pgm_bench PUBLIC gsl pthread atomic gomp) target_include_directories(pgm_bench PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) @@ -251,11 +248,13 @@ if (vldb_bench) target_link_options(btree_bench PUBLIC -mcx16) target_compile_options(btree_bench PUBLIC) - add_executable(alex_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/vldb/alex_bench.cpp) - target_link_libraries(alex_bench PUBLIC gsl ) - target_include_directories(alex_bench PRIVATE external/psudb-common/cpp/include external/alex/src/core/ benchmarks/include) - target_compile_options(alex_bench PUBLIC) - set_property(TARGET alex_bench PROPERTY CXX_STANDARD 14) + if (build_alex) + add_executable(alex_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/vldb/alex_bench.cpp) + target_link_libraries(alex_bench PUBLIC gsl ) + target_include_directories(alex_bench PRIVATE external/psudb-common/cpp/include external/alex/src/core/ benchmarks/include) + target_compile_options(alex_bench PUBLIC) + set_property(TARGET alex_bench PROPERTY CXX_STANDARD 14) + endif() add_executable(mtree_bench_alt ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/vldb/mtree_bench_alt.cpp) target_link_libraries(mtree_bench_alt PUBLIC gsl pthread atomic gomp) @@ -281,7 +280,6 @@ if (vldb_bench) target_link_options(thread_scaling_bench PUBLIC -mcx16) target_compile_options(thread_scaling_bench PUBLIC) - add_executable(btree_thread_scaling_bench ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/vldb/btree_thread_scaling_bench.cpp) target_link_libraries(btree_thread_scaling_bench PUBLIC gsl pthread atomic) diff --git a/benchmarks/include/standard_benchmarks.h b/benchmarks/include/standard_benchmarks.h index 797b0c5..d3408c1 100644 --- a/benchmarks/include/standard_benchmarks.h +++ b/benchmarks/include/standard_benchmarks.h @@ -55,17 +55,15 @@ static void run_queries(DE *extension, std::vector<typename Q::Parameters> &quer itr++; } }else if constexpr (std::is_same_v<PGM, DE>) { - size_t tot = 0; auto ptr = extension->find(queries[i].lower_bound); while (ptr != extension->end() && ptr->first <= queries[i].upper_bound) { - tot++; ++ptr; } } else { auto q = queries[i]; auto res = extension->query(std::move(q)); if constexpr (!BSM) { - auto result = res.get(); + [[maybe_unused]] auto result = res.get(); #ifdef BENCH_PRINT_RESULTS fprintf(stdout, "\n\n"); for (int i=result.size()-1; i>=0; i--) { @@ -111,7 +109,7 @@ static void run_static_queries(S *shard, std::vector<typename Q::Parameters> &qu std::vector<typename Q::LocalQuery*> local_queries = {Q::local_preproc(shard, q)}; Q::distribute_query(q, local_queries, nullptr); - auto res = Q::local_query(shard, local_queries[0]); + [[maybe_unused]] auto res = Q::local_query(shard, local_queries[0]); #ifdef BENCH_PRINT_RESULTS fprintf(stdout, "\n\n"); diff --git a/benchmarks/vldb/ts_parmsweep.cpp b/benchmarks/vldb/ts_parmsweep.cpp index a9203ab..0141c53 100644 --- a/benchmarks/vldb/ts_parmsweep.cpp +++ b/benchmarks/vldb/ts_parmsweep.cpp @@ -18,7 +18,7 @@ typedef de::Record<uint64_t, uint64_t> Rec; typedef de::TrieSpline<Rec> Shard; -typedef de::rc::Query<Shard, true> Q; +typedef de::rc::Query<Shard> Q; typedef de::DynamicExtension<Shard, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext; typedef de::DynamicExtension<Shard, Q, de::LayoutPolicy::LEVELING, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext2; typedef Q::Parameters QP; diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5a95679..719232e 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -202,7 +202,7 @@ public: * @return A future, from which the query results can be retrieved upon * query completion */ - std::future<std::vector<QueryResult>> + std::future<QueryResult> query(Parameters &&parms) { return schedule_query(std::move(parms)); } @@ -628,26 +628,17 @@ private: QueryType::distribute_query(parms, local_queries, buffer_query); /* execute the local/buffer queries and combine the results into output */ - std::vector<QueryResult> output; + QueryResult output; do { - std::vector<std::vector<LocalResult>> - query_results(shards.size() + 1); + std::vector<LocalResult> query_results(shards.size() + 1); for (size_t i = 0; i < query_results.size(); i++) { - std::vector<LocalResult> local_results; - ShardID shid; - if (i == 0) { /* execute buffer query */ - local_results = QueryType::local_query_buffer(buffer_query); - shid = INVALID_SHID; + query_results[i] = QueryType::local_query_buffer(buffer_query); } else { /*execute local queries */ - local_results = QueryType::local_query(shards[i - 1].second, + query_results[i] = QueryType::local_query(shards[i - 1].second, local_queries[i - 1]); - shid = shards[i - 1].first; } - /* framework-level, automatic delete filtering */ - query_results[i] = std::move(local_results); - /* end query early if EARLY_ABORT is set and a result exists */ if constexpr (QueryType::EARLY_ABORT) { if (query_results[i].size() > 0) @@ -695,7 +686,7 @@ private: m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION); } - std::future<std::vector<QueryResult>> + std::future<QueryResult> schedule_query(Parameters &&query_parms) { auto args = new QueryArgs<ShardType, QueryType, DynamicExtension>(); diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 1b64646..97a973d 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -12,18 +12,6 @@ namespace de { -/* - * FIXME: It would probably be best to absorb the std::vector into - * this type too; this would allow user-defined collections for - * intermediate results, which could allow for more merging - * optimizations. However, this would require an alternative - * approach to doing delete checks, so we'll leave it for now. - */ -template <typename R> -concept LocalResultInterface = requires(R res) { - { res.is_deleted() } -> std::convertible_to<bool>; - { res.is_tombstone() } -> std::convertible_to<bool>; -}; /* * @@ -35,102 +23,100 @@ template <typename QUERY, typename SHARD, typename PARAMETERS = typename QUERY::Parameters, typename LOCAL = typename QUERY::LocalQuery, typename LOCAL_BUFFER = typename QUERY::LocalQueryBuffer> -concept QueryInterface = LocalResultInterface<LOCAL_RESULT> && +concept QueryInterface = requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query, SHARD *shard, std::vector<LOCAL *> &local_queries, - std::vector<std::vector<LOCAL_RESULT>> &local_results, - std::vector<RESULT> &result, + std::vector<LOCAL_RESULT> &local_results, RESULT &result, BufferView<typename SHARD::RECORD> *bv) { + /* + * Given a set of query parameters and a shard, return a local query + * object for that shard. + */ + { + QUERY::local_preproc(shard, parameters) + } -> std::convertible_to<LOCAL *>; - /* - * Given a set of query parameters and a shard, return a local query - * object for that shard. - */ - { QUERY::local_preproc(shard, parameters) } -> std::convertible_to<LOCAL *>; - - /* - * Given a set of query parameters and a buffer view, return a local - * query object for the buffer. - * NOTE: for interface reasons, the pointer to the buffer view MUST be - * stored inside of the local query object. The future buffer - * query routine will access the buffer by way of this pointer. - */ - { - QUERY::local_preproc_buffer(bv, parameters) - } -> std::convertible_to<LOCAL_BUFFER *>; + /* + * Given a set of query parameters and a buffer view, return a local + * query object for the buffer. + * NOTE: for interface reasons, the pointer to the buffer view MUST be + * stored inside of the local query object. The future buffer + * query routine will access the buffer by way of this pointer. + */ + { + QUERY::local_preproc_buffer(bv, parameters) + } -> std::convertible_to<LOCAL_BUFFER *>; - /* - * Given a full set of local queries, and the buffer query, make any - * necessary adjustments to the local queries in-place, to account for - * global information. If no additional processing is required, this - * function can be left empty. - */ - {QUERY::distribute_query(parameters, local_queries, buffer_query)}; + /* + * Given a full set of local queries, and the buffer query, make any + * necessary adjustments to the local queries in-place, to account for + * global information. If no additional processing is required, this + * function can be left empty. + */ + { QUERY::distribute_query(parameters, local_queries, buffer_query) }; - /* - * Answer the local query, defined by `local` against `shard` and return - * a vector of LOCAL_RESULT objects defining the query result. - */ - { - QUERY::local_query(shard, local) - } -> std::convertible_to<std::vector<LOCAL_RESULT>>; + /* + * Answer the local query, defined by `local` against `shard` and return + * a vector of LOCAL_RESULT objects defining the query result. + */ + { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>; - /* - * Answer the local query defined by `local` against the buffer (which - * should be accessed by a pointer inside of `local`) and return a vector - * of LOCAL_RESULT objects defining the query result. - */ - { - QUERY::local_query_buffer(buffer_query) - } -> std::convertible_to<std::vector<LOCAL_RESULT>>; + /* + * Answer the local query defined by `local` against the buffer (which + * should be accessed by a pointer inside of `local`) and return a vector + * of LOCAL_RESULT objects defining the query result. + */ + { + QUERY::local_query_buffer(buffer_query) + } -> std::convertible_to<LOCAL_RESULT>; - /* - * Process the local results from the buffer and all of the shards, - * stored in `local_results`, and insert the associated ResultType - * objects into the `result` vector, which represents the final result - * of the query. Updates to this vector are done in-place. - */ - {QUERY::combine(local_results, parameters, result)}; + /* + * Process the local results from the buffer and all of the shards, + * stored in `local_results`, and insert the associated ResultType + * objects into the `result` vector, which represents the final result + * of the query. Updates to this vector are done in-place. + */ + { QUERY::combine(local_results, parameters, result) }; - /* - * Process the post-combine `result` vector of ResultType objects, - * in the context of the global and local query parameters, to determine - * if the query should be repeated. If so, make any necessary adjustments - * to the local query objects and return True. Otherwise, return False. - * - * If no repetition is needed for a given problem type, simply return - * False immediately and the query will end. - */ - { - QUERY::repeat(parameters, result, local_queries, buffer_query) - } -> std::same_as<bool>; + /* + * Process the post-combine `result` vector of ResultType objects, + * in the context of the global and local query parameters, to determine + * if the query should be repeated. If so, make any necessary adjustments + * to the local query objects and return True. Otherwise, return False. + * + * If no repetition is needed for a given problem type, simply return + * False immediately and the query will end. + */ + { + QUERY::repeat(parameters, result, local_queries, buffer_query) + } -> std::same_as<bool>; - /* - * If this flag is True, then the query will immediately stop and return - * a result as soon as the first non-deleted LocalRecordType is found. - * Otherwise, every Shard and the buffer will be queried and the results - * merged, like normal. - * - * This is largely an optimization flag for use with point-lookup, or - * other single-record result queries - */ - { QUERY::EARLY_ABORT } -> std::convertible_to<bool>; + /* + * If this flag is True, then the query will immediately stop and return + * a result as soon as the first non-deleted LocalRecordType is found. + * Otherwise, every Shard and the buffer will be queried and the results + * merged, like normal. + * + * This is largely an optimization flag for use with point-lookup, or + * other single-record result queries + */ + { QUERY::EARLY_ABORT } -> std::convertible_to<bool>; - /* - * If false, the built-in delete filtering that the framework can - * apply to the local results, prior to calling combine, will be skipped. - * This general filtering can be inefficient, particularly for tombstone - * -based deletes, and so if a more efficient manual filtering can be - * performed, it is worth setting this to True and doing that filtering - * in the combine step. - * - * If deletes are not a consideration for your problem, it's also best - * to turn this off, as it'll avoid the framework making an extra pass - * over the local results prior to combining them. - * - * TODO: Temporarily disabling this, as we've dropped framework-level - * delete filtering for the time being. - */ - /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */ -}; + /* + * If false, the built-in delete filtering that the framework can + * apply to the local results, prior to calling combine, will be skipped. + * This general filtering can be inefficient, particularly for tombstone + * -based deletes, and so if a more efficient manual filtering can be + * performed, it is worth setting this to True and doing that filtering + * in the combine step. + * + * If deletes are not a consideration for your problem, it's also best + * to turn this off, as it'll avoid the framework making an extra pass + * over the local results prior to combining them. + * + * TODO: Temporarily disabling this, as we've dropped framework-level + * delete filtering for the time being. + */ + /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */ + }; } // namespace de diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 6b6f040..34f053a 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -35,7 +35,7 @@ struct ReconstructionArgs { }; template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs { - std::promise<std::vector<typename Q::ResultType>> result_set; + std::promise<typename Q::ResultType> result_set; typename Q::Parameters query_parms; DE *extension; }; diff --git a/include/query/irs.h b/include/query/irs.h index 6dec850..ec6fa29 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -40,15 +40,15 @@ public: BufferView<R> *buffer; size_t cutoff; - std::vector<Wrapped<R>> records; + std::vector<R> records; std::unique_ptr<psudb::Alias> alias; size_t sample_size; Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; @@ -86,9 +86,11 @@ public: } for (size_t i = 0; i < query->cutoff; i++) { - if ((query->buffer->get(i)->rec.key >= query->global_parms.lower_bound) && - (buffer->get(i)->rec.key <= query->global_parms.upper_bound)) { - query->records.emplace_back(*(query->buffer->get(i))); + auto rec = buffer->get(i); + if (rec->rec.key >= query->global_parms.lower_bound && + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + query->records.emplace_back(query->buffer->get(i)->rec); } } @@ -163,10 +165,10 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { + static LocalResultType local_query(S *shard, LocalQuery *query) { auto sample_sz = query->sample_size; - std::vector<LocalResultType> result_set; + LocalResultType result_set; if (sample_sz == 0 || query->lower_idx == shard->get_record_count()) { return result_set; @@ -180,15 +182,19 @@ public: (range_length > 0) ? gsl_rng_uniform_int(query->global_parms.rng, range_length) : 0; - result_set.emplace_back(*shard->get_record_at(query->lower_idx + idx)); + auto wrec = shard->get_record_at(query->lower_idx + idx); + + if (!wrec->is_deleted() && !wrec->is_tombstone()) { + result_set.emplace_back(wrec->rec); + } } while (attempts < sample_sz); return result_set; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; result.reserve(query->sample_size); if constexpr (REJECTION) { @@ -197,8 +203,9 @@ public: auto rec = query->buffer->get(idx); if (rec->rec.key >= query->global_parms.lower_bound && - rec->rec.key <= query->global_parms.upper_bound) { - result.emplace_back(*rec); + rec->rec.key <= query->global_parms.upper_bound && + (!rec->is_deleted() && !rec->is_tombstone())) { + result.emplace_back(rec->rec); } } @@ -215,16 +222,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { diff --git a/include/query/knn.h b/include/query/knn.h index 87ea10a..91a032c 100644 --- a/include/query/knn.h +++ b/include/query/knn.h @@ -39,8 +39,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<const Wrapped<R> *> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -66,8 +66,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> results; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -79,17 +79,16 @@ public: shard->search(query->global_parms.point, query->global_parms.k, pq); while (pq.size() > 0) { - results.emplace_back(*pq.peek().data); + results.emplace_back(pq.peek().data); pq.pop(); } return results; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> results; + LocalResultType results; Wrapped<R> wrec; wrec.rec = query->global_parms.point; @@ -118,41 +117,44 @@ public: } while (pq.size() > 0) { - results.emplace_back(*(pq.peek().data)); + results.emplace_back(pq.peek().data); pq.pop(); } - return std::move(results); + return results; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + + Wrapped<R> wrec; + wrec.rec = parms->point; + wrec.header = 0; + PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> pq(parms->k, &wrec); - PriorityQueue<R, DistCmpMax<R>> pq(parms->k, &(parms->point)); for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { if (pq.size() < parms->k) { - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } else { - double head_dist = pq.peek().data->calc_distance(parms->point); - double cur_dist = local_results[i][j].rec.calc_distance(parms->point); + double head_dist = pq.peek().data->rec.calc_distance(wrec.rec); + double cur_dist = local_results[i][j]->rec.calc_distance(wrec.rec); if (cur_dist < head_dist) { pq.pop(); - pq.push(&local_results[i][j].rec); + pq.push(local_results[i][j]); } } } } while (pq.size() > 0) { - output.emplace_back(*pq.peek().data); + output.emplace_back(pq.peek().data->rec); pq.pop(); } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index f3788de..65cffa7 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = true; constexpr static bool SKIP_DELETE_FILTER = true; @@ -65,8 +65,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; auto r = shard->point_lookup({query->global_parms.search_key, 0}); @@ -77,9 +77,8 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); @@ -95,8 +94,8 @@ public: static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (auto r : local_results) { if (r.size() > 0) { if (r[0].is_deleted() || r[0].is_tombstone()) { @@ -109,7 +108,7 @@ public: } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 68d304d..0898473 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -16,7 +16,7 @@ namespace de { namespace rc { -template <ShardInterface S, bool FORCE_SCAN = true> class Query { +template <ShardInterface S> class Query { typedef typename S::RECORD R; public: @@ -75,8 +75,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result = {0, 0}; /* * if the returned index is one past the end of the @@ -88,8 +88,6 @@ public: } auto ptr = shard->get_record_at(query->start_idx); - size_t reccnt = 0; - size_t tscnt = 0; /* * roll the pointer forward to the first record that is @@ -104,53 +102,48 @@ public: ptr->rec.key <= query->global_parms.upper_bound) { if (!ptr->is_deleted()) { - reccnt++; + result.record_count++; if (ptr->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } ptr++; } - result.push_back({reccnt, tscnt}); return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; - size_t reccnt = 0; - size_t tscnt = 0; + LocalResultType result = {0, 0}; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && rec->rec.key <= query->global_parms.upper_bound) { if (!rec->is_deleted()) { - reccnt++; + result.record_count++; if (rec->is_tombstone()) { - tscnt++; + result.tombstone_count++; } } } } - result.push_back({reccnt, tscnt}); - return result; } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { size_t reccnt = 0; size_t tscnt = 0; for (auto &local_result : local_results) { - reccnt += local_result[0].record_count; - tscnt += local_result[0].tombstone_count; + reccnt += local_result.record_count; + tscnt += local_result.tombstone_count; } /* if more tombstones than results, clamp the output at 0 */ @@ -158,10 +151,10 @@ public: tscnt = reccnt; } - output.push_back({reccnt - tscnt}); + output = reccnt - tscnt; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/rangequery.h b/include/query/rangequery.h index e7be39c..d9e7db8 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -38,8 +38,8 @@ public: Parameters global_parms; }; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<Wrapped<R>> LocalResultType; + typedef std::vector<R> ResultType; constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -69,8 +69,8 @@ public: return; } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; /* * if the returned index is one past the end of the @@ -101,10 +101,9 @@ public: return result; } - static std::vector<LocalResultType> - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { auto rec = query->buffer->get(i); if (rec->rec.key >= query->global_parms.lower_bound && @@ -116,26 +115,25 @@ public: return result; } - static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { - std::vector<Cursor<LocalResultType>> cursors; + static void combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { + std::vector<Cursor<Wrapped<R>>> cursors; cursors.reserve(local_results.size()); - psudb::PriorityQueue<LocalResultType> pq(local_results.size()); + psudb::PriorityQueue<Wrapped<R>> pq(local_results.size()); size_t total = 0; size_t tmp_n = local_results.size(); for (size_t i = 0; i < tmp_n; ++i) if (local_results[i].size() > 0) { auto base = local_results[i].data(); - cursors.emplace_back(Cursor<LocalResultType>{ + cursors.emplace_back(Cursor<Wrapped<R>>{ base, base + local_results[i].size(), 0, local_results[i].size()}); assert(i == cursors.size() - 1); total += local_results[i].size(); pq.push(cursors[i].ptr, tmp_n - i - 1); } else { - cursors.emplace_back(Cursor<LocalResultType>{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0}); } if (total == 0) { @@ -146,9 +144,8 @@ public: while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 - ? pq.peek(1) - : psudb::queue_record<LocalResultType>{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) + : psudb::queue_record<Wrapped<R>>{nullptr, 0}; if (!now.data->is_tombstone() && next.data != nullptr && now.data->rec == next.data->rec && next.data->is_tombstone()) { @@ -156,9 +153,9 @@ public: pq.pop(); auto &cursor1 = cursors[tmp_n - now.version - 1]; auto &cursor2 = cursors[tmp_n - next.version - 1]; - if (advance_cursor<LocalResultType>(cursor1)) + if (advance_cursor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor<LocalResultType>(cursor2)) + if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto &cursor = cursors[tmp_n - now.version - 1]; @@ -167,7 +164,7 @@ public: pq.pop(); - if (advance_cursor<LocalResultType>(cursor)) + if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version); } } @@ -175,7 +172,7 @@ public: return; } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { return false; diff --git a/include/query/wss.h b/include/query/wss.h index 54620ca..d4e75f3 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -49,8 +49,8 @@ public: constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = false; - typedef Wrapped<R> LocalResultType; - typedef R ResultType; + typedef std::vector<R> LocalResultType; + typedef std::vector<R> ResultType; static LocalQuery *local_preproc(S *shard, Parameters *parms) { auto query = new LocalQuery(); @@ -130,8 +130,8 @@ public: } } - static std::vector<LocalResultType> local_query(S *shard, LocalQuery *query) { - std::vector<LocalResultType> result; + static LocalResultType local_query(S *shard, LocalQuery *query) { + LocalResultType result; if (query->sample_size == 0) { return result; @@ -139,25 +139,25 @@ public: for (size_t i = 0; i < query->sample_size; i++) { size_t idx = shard->get_weighted_sample(query->global_parms.rng); - if (!shard->get_record_at(idx)->is_deleted()) { - result.emplace_back(*shard->get_record_at(idx)); + if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) { + result.emplace_back(shard->get_record_at(idx)->rec); } } return result; } - static std::vector<LocalResultType> + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { - std::vector<LocalResultType> result; + LocalResultType result; for (size_t i = 0; i < query->sample_size; i++) { auto idx = gsl_rng_uniform_int(query->global_parms.rng, query->cutoff); auto rec = query->buffer->get(idx); auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight; - if (test <= rec->rec.weight && !rec->is_deleted()) { - result.emplace_back(*rec); + if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) { + result.emplace_back(rec->rec); } } @@ -165,16 +165,16 @@ public: } static void - combine(std::vector<std::vector<LocalResultType>> const &local_results, - Parameters *parms, std::vector<ResultType> &output) { + combine(std::vector<LocalResultType> const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { - output.emplace_back(local_results[i][j].rec); + output.emplace_back(local_results[i][j]); } } } - static bool repeat(Parameters *parms, std::vector<ResultType> &output, + static bool repeat(Parameters *parms, ResultType &output, std::vector<LocalQuery *> const &local_queries, LocalQueryBuffer *buffer_query) { if (output.size() < parms->sample_size) { diff --git a/tests/include/irs.h b/tests/include/irs.h index 1c5be2c..35a4bbb 100644 --- a/tests/include/irs.h +++ b/tests/include/irs.h @@ -18,7 +18,6 @@ #pragma once #include "query/irs.h" -#include <algorithm> /* * Uncomment these lines temporarily to remove errors in this file @@ -27,16 +26,16 @@ * should be included in the source file that includes this one, above the * include statement. */ -#include "shard/ISAMTree.h" -#include "query/irs.h" -#include "testing.h" -#include <check.h> -#include <gsl/gsl_rng.h> -using namespace de; +// #include "shard/ISAMTree.h" +// #include "query/irs.h" +// #include "testing.h" +// #include <check.h> +// #include <gsl/gsl_rng.h> +// using namespace de; -typedef Rec R; -typedef ISAMTree<R> Shard; -typedef irs::Query<ISAMTree<R>> Query; +// typedef Rec R; +// typedef ISAMTree<R> Shard; +// typedef irs::Query<ISAMTree<R>> Query; static gsl_rng *g_rng; @@ -60,8 +59,8 @@ START_TEST(t_irs) ck_assert_int_eq(result.size(), k); for (size_t i=0; i<result.size(); i++) { - ck_assert_int_le(result[i].rec.key, parms.upper_bound); - ck_assert_int_ge(result[i].rec.key, parms.lower_bound); + ck_assert_int_le(result[i].key, parms.upper_bound); + ck_assert_int_ge(result[i].key, parms.lower_bound); } delete buffer; @@ -89,8 +88,8 @@ START_TEST(t_buffer_irs) ck_assert_int_le(result.size(), k); for (size_t i=0; i<result.size(); i++) { - ck_assert_int_le(result[i].rec.key, parms.upper_bound); - ck_assert_int_ge(result[i].rec.key, parms.lower_bound); + ck_assert_int_le(result[i].key, parms.upper_bound); + ck_assert_int_ge(result[i].key, parms.lower_bound); } } @@ -128,7 +127,7 @@ START_TEST(t_irs_merge) irs::Query<Shard>::distribute_query(&parms, {query1, query2}, &dummy_buffer_query); - std::vector<std::vector<irs::Query<Shard>::LocalResultType>> results(2); + std::vector<irs::Query<Shard>::LocalResultType> results(2); results[0] = irs::Query<Shard>::local_query(&shard1, query1); results[1] = irs::Query<Shard>::local_query(&shard2, query2); delete query1; @@ -136,16 +135,16 @@ START_TEST(t_irs_merge) ck_assert_int_eq(results[0].size() + results[1].size(), k); - std::vector<std::vector<Wrapped<R>>> proc_results; + std::vector<std::vector<R>> proc_results; for (size_t j=0; j<results.size(); j++) { - proc_results.emplace_back(std::vector<Wrapped<R>>()); + proc_results.emplace_back(std::vector<R>()); for (size_t i=0; i<results[j].size(); i++) { proc_results[j].emplace_back(results[j][i]); } } - std::vector<irs::Query<Shard>::ResultType> result; + irs::Query<Shard>::ResultType result; irs::Query<Shard>::combine(proc_results, nullptr, result); ck_assert_int_eq(result.size(), k); @@ -154,7 +153,7 @@ START_TEST(t_irs_merge) } END_TEST -static void inject_irs_tests(Suite *suite) { +[[maybe_unused]] static void inject_irs_tests(Suite *suite) { g_rng = gsl_rng_alloc(gsl_rng_mt19937); TCase *irs = tcase_create("Independent Range Sampling Query Testing"); diff --git a/tests/include/pointlookup.h b/tests/include/pointlookup.h index af58440..f3a03dd 100644 --- a/tests/include/pointlookup.h +++ b/tests/include/pointlookup.h @@ -27,12 +27,12 @@ * include statement. */ -#include "shard/FSTrie.h" -#include "testing.h" -#include <check.h> -using namespace de; -typedef StringRec R; -typedef FSTrie<R> Shard; +// #include "shard/FSTrie.h" +// #include "testing.h" +// #include <check.h> +// using namespace de; +// typedef StringRec R; +// typedef FSTrie<R> Shard; START_TEST(t_point_lookup_query) { @@ -99,7 +99,7 @@ START_TEST(t_buffer_point_lookup) END_TEST -static void inject_pointlookup_tests(Suite *suite) { +[[maybe_unused]] static void inject_pointlookup_tests(Suite *suite) { TCase *point_lookup_query = tcase_create("Point Lookup Testing"); tcase_add_test(point_lookup_query, t_point_lookup_query); tcase_add_test(point_lookup_query, t_buffer_point_lookup); diff --git a/tests/include/rangecount.h b/tests/include/rangecount.h index 22189b9..b77c77d 100644 --- a/tests/include/rangecount.h +++ b/tests/include/rangecount.h @@ -18,7 +18,6 @@ #pragma once #include "query/rangecount.h" -#include <algorithm> /* * Uncomment these lines temporarily to remove errors in this file @@ -28,7 +27,6 @@ * include statement. */ // #include "shard/ISAMTree.h" -// #include "query/rangequery.h" // #include "testing.h" // #include <check.h> // using namespace de; @@ -49,7 +47,7 @@ START_TEST(t_range_count) auto result = rc::Query<Shard>::local_query(&shard, local_query); delete local_query; - ck_assert_int_eq(result[0].record_count - result[0].tombstone_count, parms.upper_bound - parms.lower_bound + 1); + ck_assert_int_eq(result.record_count - result.tombstone_count, parms.upper_bound - parms.lower_bound + 1); delete buffer; } @@ -68,7 +66,7 @@ START_TEST(t_buffer_range_count) auto result = rc::Query<Shard>::local_query_buffer(query); delete query; - ck_assert_int_eq(result[0].record_count - result[0].tombstone_count, parms.upper_bound - parms.lower_bound + 1); + ck_assert_int_eq(result.record_count - result.tombstone_count, parms.upper_bound - parms.lower_bound + 1); } delete buffer; @@ -91,28 +89,28 @@ START_TEST(t_range_count_merge) auto query1 = rc::Query<Shard>::local_preproc(&shard1, &parms); auto query2 = rc::Query<Shard>::local_preproc(&shard2, &parms); - std::vector<std::vector<rc::Query<Shard>::LocalResultType>> results(2); + std::vector<rc::Query<Shard>::LocalResultType> results(2); results[0] = rc::Query<Shard>::local_query(&shard1, query1); results[1] = rc::Query<Shard>::local_query(&shard2, query2); delete query1; delete query2; - size_t reccnt = results[0][0].record_count + results[1][0].record_count; - size_t tscnt = results[0][0].tombstone_count + results[1][0].tombstone_count; + size_t reccnt = results[0].record_count + results[1].record_count; + size_t tscnt = results[0].tombstone_count + results[1].tombstone_count; ck_assert_int_eq(reccnt - tscnt, result_size); - std::vector<rc::Query<Shard>::ResultType> result; + rc::Query<Shard>::ResultType result; rc::Query<Shard>::combine(results, nullptr, result); - ck_assert_int_eq(result[0], result_size); + ck_assert_int_eq(result, result_size); delete buffer1; delete buffer2; } END_TEST -static void inject_rangecount_tests(Suite *suite) { +[[maybe_unused]] static void inject_rangecount_tests(Suite *suite) { TCase *range_count = tcase_create("Range Query Testing"); tcase_add_test(range_count, t_range_count); tcase_add_test(range_count, t_buffer_range_count); diff --git a/tests/include/rangequery.h b/tests/include/rangequery.h index 5c3c1d6..f7bb7c1 100644 --- a/tests/include/rangequery.h +++ b/tests/include/rangequery.h @@ -99,7 +99,7 @@ START_TEST(t_range_query_merge) auto query1 = rq::Query<Shard>::local_preproc(&shard1, &parms); auto query2 = rq::Query<Shard>::local_preproc(&shard2, &parms); - std::vector<std::vector<rq::Query<Shard>::LocalResultType>> results(2); + std::vector<rq::Query<Shard>::LocalResultType> results(2); results[0] = rq::Query<Shard>::local_query(&shard1, query1); results[1] = rq::Query<Shard>::local_query(&shard2, query2); delete query1; @@ -116,7 +116,7 @@ START_TEST(t_range_query_merge) } } - std::vector<rq::Query<Shard>::ResultType> result; + rq::Query<Shard>::ResultType result; rq::Query<Shard>::combine(proc_results, nullptr, result); std::sort(result.begin(), result.end()); @@ -134,41 +134,7 @@ START_TEST(t_range_query_merge) } END_TEST - -START_TEST(t_lower_bound) -{ - auto buffer1 = create_sequential_mbuffer<R>(100, 200); - auto buffer2 = create_sequential_mbuffer<R>(400, 1000); - - auto shard1 = new Shard(buffer1->get_buffer_view()); - auto shard2 = new Shard(buffer2->get_buffer_view()); - - std::vector<Shard*> shards = {shard1, shard2}; - - auto merged = Shard(shards); - - for (uint32_t i=100; i<1000; i++) { - auto idx = merged.get_lower_bound(i); - - assert(idx < merged.get_record_count()); - - auto res = merged.get_record_at(idx); - - if (i >=200 && i <400) { - ck_assert_int_lt(res->rec.key, i); - } else { - ck_assert_int_eq(res->rec.key, i); - } - } - - delete buffer1; - delete buffer2; - delete shard1; - delete shard2; -} -END_TEST - -static void inject_rangequery_tests(Suite *suite) { +[[maybe_unused]] static void inject_rangequery_tests(Suite *suite) { TCase *range_query = tcase_create("Range Query Testing"); tcase_add_test(range_query, t_range_query); tcase_add_test(range_query, t_buffer_range_query); diff --git a/tests/include/shard_standard.h b/tests/include/shard_standard.h index ece2a57..de43edc 100644 --- a/tests/include/shard_standard.h +++ b/tests/include/shard_standard.h @@ -35,19 +35,27 @@ typedef ISAMTree<R> Shard; START_TEST(t_mbuffer_init) { auto buffer = new MutableBuffer<R>(512, 1024); + R r = {}; + for (uint64_t i = 512; i > 0; i--) { uint32_t v = i; - buffer->append({i, v}); + r.key = i; + r.value = v; + buffer->append(r); } for (uint64_t i = 1; i <= 256; ++i) { uint32_t v = i; - buffer->append({i, v}, true); + r.key = i; + r.value = v; + buffer->append(r, true); } for (uint64_t i = 257; i <= 512; ++i) { uint32_t v = i + 1; - buffer->append({i, v}); + r.key = i; + r.value = v; + buffer->append(r); } Shard* shard = new Shard(buffer->get_buffer_view()); diff --git a/tests/include/testing.h b/tests/include/testing.h index 33cbb3f..090221c 100644 --- a/tests/include/testing.h +++ b/tests/include/testing.h @@ -3,7 +3,7 @@ * * Unit test utility functions/definitions * - * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu> * Dong Xie <dongx@psu.edu> * * Distributed under the Modified BSD License. @@ -13,160 +13,184 @@ #include <string> -#include <unistd.h> #include <fcntl.h> #include <fstream> #include <sstream> +#include <unistd.h> -#include "util/types.h" -#include "psu-util/alignment.h" -#include "framework/structure/MutableBuffer.h" #include "framework/interface/Record.h" +#include "framework/structure/MutableBuffer.h" +#include "psu-util/alignment.h" +#include "util/types.h" typedef de::WeightedRecord<uint64_t, uint32_t, uint64_t> WRec; typedef de::Record<uint64_t, uint32_t> Rec; typedef de::EuclidPoint<uint64_t> PRec; -typedef de::Record<const char*, uint64_t> StringRec; +typedef de::Record<const char *, uint64_t> StringRec; static std::string kjv_wordlist = "tests/data/kjv-wordlist.txt"; static std::string summa_wordlist = "tests/data/summa-wordlist.txt"; -static std::vector<std::unique_ptr<char[]>> string_data; +static struct sd { + std::vector<char *> data; + ~sd() { + for (size_t i = 0; i < data.size(); i++) { + delete data[i]; + } + } +} string_data; -[[maybe_unused]] static std::vector<StringRec> read_string_data(std::string fname, size_t n) { - std::vector<StringRec> vec; - vec.reserve(n); - string_data.reserve(n); +[[maybe_unused]] static std::vector<StringRec> +read_string_data(std::string fname, size_t n) { + std::vector<StringRec> vec; + vec.reserve(n); + string_data.data.reserve(n); - std::fstream file; - file.open(fname, std::ios::in); + std::fstream file; + file.open(fname, std::ios::in); - for (size_t i=0; i<n; i++) { - std::string line; - if (!std::getline(file, line, '\n')) break; + for (size_t i = 0; i < n; i++) { + std::string line; + if (!std::getline(file, line, '\n')) + break; - std::stringstream ls(line); - std::string field; + std::stringstream ls(line); + std::string field; - std::getline(ls, field, '\t'); - uint64_t val = atol(field.c_str()); - std::getline(ls, field, '\n'); + std::getline(ls, field, '\t'); + uint64_t val = atol(field.c_str()); + std::getline(ls, field, '\n'); - char *c = strdup(field.c_str()); + string_data.data.push_back(strdup(field.c_str())); - string_data.push_back(std::unique_ptr<char[]>(c)); + StringRec r{string_data.data[string_data.data.size() - 1], val, + field.size()}; - StringRec r{string_data[string_data.size() -1].get(), val, field.size()}; - - vec.push_back(r); - } + vec.push_back(r); + } - return vec; + return vec; } - -template <de::RecordInterface R> +template <de::RecordInterface R> std::vector<R> strip_wrapping(std::vector<de::Wrapped<R>> vec) { - std::vector<R> out(vec.size()); - for (uint32_t i=0; i<vec.size(); i++) { - out[i] = vec[i].rec; - } + std::vector<R> out(vec.size()); + for (uint32_t i = 0; i < vec.size(); i++) { + out[i] = vec[i].rec; + } - return out; + return out; } -[[maybe_unused]] static bool initialize_test_file(std::string fname, size_t page_cnt) -{ - auto flags = O_RDWR | O_CREAT | O_TRUNC; - mode_t mode = 0640; - char *page = nullptr; - - int fd = open(fname.c_str(), flags, mode); - if (fd == -1) { - goto error; +[[maybe_unused]] static bool initialize_test_file(std::string fname, + size_t page_cnt) { + auto flags = O_RDWR | O_CREAT | O_TRUNC; + mode_t mode = 0640; + char *page = nullptr; + + int fd = open(fname.c_str(), flags, mode); + if (fd == -1) { + goto error; + } + + page = (char *)aligned_alloc(psudb::SECTOR_SIZE, psudb::PAGE_SIZE); + if (!page) { + goto error_opened; + } + + for (size_t i = 0; i <= page_cnt; i++) { + *((int *)page) = i; + if (write(fd, page, psudb::PAGE_SIZE) == -1) { + goto error_alloced; } + } - page = (char *) aligned_alloc(psudb::SECTOR_SIZE, psudb::PAGE_SIZE); - if (!page) { - goto error_opened; - } + free(page); - for (size_t i=0; i<=page_cnt; i++) { - *((int *) page) = i; - if (write(fd, page, psudb::PAGE_SIZE) == -1) { - goto error_alloced; - } - } - - free(page); - - return 1; + return 1; error_alloced: - free(page); + free(page); error_opened: - close(fd); + close(fd); error: - return 0; + return 0; } -[[maybe_unused]] static bool roughly_equal(int n1, int n2, size_t mag, double epsilon) { - return ((double) std::abs(n1 - n2) / (double) mag) < epsilon; +[[maybe_unused]] static bool roughly_equal(int n1, int n2, size_t mag, + double epsilon) { + return ((double)std::abs(n1 - n2) / (double)mag) < epsilon; } template <de::RecordInterface R> -static de::MutableBuffer<R> *create_test_mbuffer(size_t cnt) -{ - auto buffer = new de::MutableBuffer<R>(cnt/2, cnt); - - if constexpr (de::KVPInterface<R>){ - if constexpr (std::is_same_v<decltype(R::key), const char*>){ - auto records = read_string_data(kjv_wordlist, cnt); - for (size_t i=0; i<cnt; i++) { - buffer->append(records[i]); - } +static de::MutableBuffer<R> *create_test_mbuffer(size_t cnt) { + auto buffer = new de::MutableBuffer<R>(cnt / 2, cnt); + R r = {}; + + if constexpr (de::KVPInterface<R>) { + if constexpr (std::is_same_v<decltype(R::key), const char *>) { + auto records = read_string_data(kjv_wordlist, cnt); + for (size_t i = 0; i < cnt; i++) { + buffer->append(records[i]); + } + } else { + for (size_t i = 0; i < cnt; i++) { + r.key = rand(); + r.value = rand(); + if constexpr (de::WeightedRecordInterface<R>) { + r.weight = 1; + buffer->append(r); } else { - for (size_t i = 0; i < cnt; i++) { - if constexpr (de::WeightedRecordInterface<R>) { - buffer->append({(uint64_t) rand(), (uint32_t) rand(), 1}); - } else { - buffer->append({(uint64_t) rand(), (uint32_t) rand()}); - } - } - } - } else if constexpr (de::NDRecordInterface<R>) { - for (size_t i=0; i<cnt; i++) { - buffer->append({(uint64_t) rand(), (uint64_t) rand()}); + buffer->append(r); } - } + } + } + } else if constexpr (de::NDRecordInterface<R>) { + for (size_t i = 0; i < cnt; i++) { + r.data[0] = rand(); + r.data[1] = rand(); + buffer->append(r); + } + } - return buffer; + return buffer; } template <de::RecordInterface R> -static de::MutableBuffer<R> *create_sequential_mbuffer(size_t start, size_t stop) -{ - size_t cnt = stop - start; - auto buffer = new de::MutableBuffer<R>(cnt/2, cnt); - - for (uint32_t i=start; i<stop; i++) { - - if constexpr (de::WeightedRecordInterface<R>) { - buffer->append({i, i, 1}); - } else { - buffer->append({i, i}); - } +static de::MutableBuffer<R> *create_sequential_mbuffer(size_t start, + size_t stop) { + size_t cnt = stop - start; + auto buffer = new de::MutableBuffer<R>(cnt / 2, cnt); + + R r = {}; + + for (uint32_t i = start; i < stop; i++) { + if constexpr (de::NDRecordInterface<R>) { + r.data[0] = i; + r.data[1] = i; + buffer->append(r); + } else { + r.key = i; + r.value = i; + if constexpr (de::WeightedRecordInterface<R>) { + r.weight = 1; + buffer->append(r); + } else { + buffer->append(r); + } } + } - return buffer; + return buffer; } /* template <de::KVPInterface R> -static de::MutableBuffer<R> *create_test_mbuffer_tombstones(size_t cnt, size_t ts_cnt) +static de::MutableBuffer<R> *create_test_mbuffer_tombstones(size_t cnt, size_t +ts_cnt) { auto buffer = new de::MutableBuffer<R>(cnt/2, cnt); @@ -198,43 +222,55 @@ static de::MutableBuffer<R> *create_test_mbuffer_tombstones(size_t cnt, size_t t */ template <typename R> -requires de::WeightedRecordInterface<R> && de::KVPInterface<R> -static de::MutableBuffer<R> *create_weighted_mbuffer(size_t cnt) -{ - auto buffer = new de::MutableBuffer<R>(cnt/2, cnt); - - // Put in half of the count with weight one. - for (uint32_t i=0; i< cnt / 2; i++) { - buffer->append(R {1, i, 2}); - } - - // put in a quarter of the count with weight four. - for (uint32_t i=0; i< cnt / 4; i++) { - buffer->append(R {2, i, 4}); - } - - // the remaining quarter with weight eight. - for (uint32_t i=0; i< cnt / 4; i++) { - buffer->append(R {3, i, 8}); - } - - return buffer; + requires de::WeightedRecordInterface<R> && de::KVPInterface<R> +static de::MutableBuffer<R> *create_weighted_mbuffer(size_t cnt) { + auto buffer = new de::MutableBuffer<R>(cnt / 2, cnt); + R r = {}; + + // Put in half of the count with weight one. + for (uint32_t i = 0; i < cnt / 2; i++) { + r.key = 1; + r.value = i; + r.weight = 2; + buffer->append(r); + } + + // put in a quarter of the count with weight four. + for (uint32_t i = 0; i < cnt / 4; i++) { + r.key = 2; + r.value = i; + r.weight = 4; + buffer->append(r); + } + + // the remaining quarter with weight eight. + for (uint32_t i = 0; i < cnt / 4; i++) { + r.key = 3; + r.value = i; + r.weight = 8; + buffer->append(r); + } + + return buffer; } template <de::KVPInterface R> -static de::MutableBuffer<R> *create_double_seq_mbuffer(size_t cnt, bool ts=false) -{ - auto buffer = new de::MutableBuffer<R>(cnt/2, cnt); - - for (uint32_t i = 0; i < cnt / 2; i++) { - buffer->append({i, i}, ts); - } - - for (uint32_t i = 0; i < cnt / 2; i++) { - buffer->append({i, i+1}, ts); - } - - return buffer; +static de::MutableBuffer<R> *create_double_seq_mbuffer(size_t cnt, + bool ts = false) { + auto buffer = new de::MutableBuffer<R>(cnt / 2, cnt); + R r = {}; + + for (uint32_t i = 0; i < cnt / 2; i++) { + r.key = i; + r.value = i; + buffer->append(r, ts); + } + + for (uint32_t i = 0; i < cnt / 2; i++) { + r.key = i; + r.value = i + 1; + buffer->append(r, ts); + } + + return buffer; } - - diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp index c3e1b34..16f9269 100644 --- a/tests/mutable_buffer_tests.cpp +++ b/tests/mutable_buffer_tests.cpp @@ -52,7 +52,9 @@ START_TEST(t_insert) { auto buffer = new MutableBuffer<Rec>(50, 100); - Rec rec = {0, 5}; + Rec rec = {}; + rec.key = 0; + rec.value = 5; /* insert records up to the low watermark */ size_t cnt = 0; @@ -107,7 +109,10 @@ START_TEST(t_advance_head) /* insert 75 records and get tail when LWM is exceeded */ size_t new_head = 0; - Rec rec = {1, 1}; + Rec rec = {}; + rec.key = 1; + rec.value = 1; + size_t cnt = 0; for (size_t i=0; i<75; i++) { ck_assert_int_eq(buffer->append(rec), 1); @@ -204,7 +209,9 @@ START_TEST(t_truncate) auto buffer = new MutableBuffer<Rec>(50, 100); size_t ts_cnt = 0; - Rec rec = {0, 5}; + Rec rec = {}; + rec.key = 0; + rec.value = 5; for (size_t i=0; i<100; i++) { bool ts = false; @@ -244,7 +251,10 @@ START_TEST(t_bview_get) /* insert 75 records and get tail when LWM is exceeded */ size_t new_head = 0; - Rec rec = {1, 1}; + Rec rec = {}; + rec.key = 1; + rec.value = 1; + size_t cnt = 0; for (size_t i=0; i<75; i++) { ck_assert_int_eq(buffer->append(rec), 1); @@ -322,7 +332,10 @@ START_TEST(t_bview_delete) /* insert 75 records and get tail when LWM is exceeded */ size_t new_head = 0; - Rec rec = {1, 1}; + Rec rec = {}; + rec.key = 1; + rec.value = 1; + for (size_t i=0; i<75; i++) { ck_assert_int_eq(buffer->append(rec), 1); diff --git a/tests/vptree_tests.cpp b/tests/vptree_tests.cpp index 7e9e79e..53bb526 100644 --- a/tests/vptree_tests.cpp +++ b/tests/vptree_tests.cpp @@ -29,8 +29,8 @@ START_TEST(t_mbuffer_init) size_t n= 24; auto buffer = new MutableBuffer<PRec>(n/2, n); - for (int64_t i=0; i<n; i++) { - buffer->append({(uint64_t) i, (uint64_t) i}); + for (size_t i=0; i<n; i++) { + buffer->append({i, i}); } Shard* shard = new Shard(buffer->get_buffer_view()); @@ -139,7 +139,7 @@ START_TEST(t_buffer_query) std::sort(result.begin(), result.end()); size_t start = 120 - 5; for (size_t i=0; i<result.size(); i++) { - ck_assert_int_eq(result[i].rec.data[0], start++); + ck_assert_int_eq(result[i]->rec.data[0], start++); } } @@ -169,17 +169,17 @@ START_TEST(t_knn_query) std::sort(results.begin(), results.end()); if ((int64_t) (p.point.data[0] - p.k/2 - 1) < 0) { - ck_assert_int_eq(results[0].rec.data[0], 0); + ck_assert_int_eq(results[0]->rec.data[0], 0); } else { - ck_assert(results[0].rec.data[0] == (p.point.data[0] - p.k/2 - 1) || - results[0].rec.data[0] == (p.point.data[0] - p.k/2) || - results[0].rec.data[0] == (p.point.data[0] - p.k/2 + 1)); + ck_assert(results[0]->rec.data[0] == (p.point.data[0] - p.k/2 - 1) || + results[0]->rec.data[0] == (p.point.data[0] - p.k/2) || + results[0]->rec.data[0] == (p.point.data[0] - p.k/2 + 1)); } - size_t start = results[0].rec.data[0]; + size_t start = results[0]->rec.data[0]; for (size_t i=0; i<results.size(); i++) { - ck_assert_int_eq(results[i].rec.data[0], start++); + ck_assert_int_eq(results[i]->rec.data[0], start++); } } |