From 4ac2e14d24a1fdd3f9bf777775b16bf6a677f487 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 Jan 2024 10:14:05 -0500 Subject: Added RangeCount query --- CMakeLists.txt | 7 ++ include/query/rangecount.h | 169 +++++++++++++++++++++++++++++++++++++++++++++ tests/include/rangecount.h | 155 +++++++++++++++++++++++++++++++++++++++++ tests/rangecount_tests.cpp | 55 +++++++++++++++ 4 files changed, 386 insertions(+) create mode 100644 include/query/rangecount.h create mode 100644 tests/include/rangecount.h create mode 100644 tests/rangecount_tests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e286d7..a051361 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,6 +52,13 @@ if (tests) target_link_options(rangequery_tests PUBLIC -mcx16) target_include_directories(rangequery_tests PRIVATE include external/psudb-common/cpp/include) + + add_executable(rangecount_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/rangecount_tests.cpp) + target_link_libraries(rangecount_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(rangecount_tests PUBLIC -mcx16) + target_include_directories(rangecount_tests PRIVATE include external/psudb-common/cpp/include) + + #add_executable(vptree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/vptree_tests.cpp) #target_link_libraries(vptree_tests PUBLIC gsl check subunit pthread) #target_include_directories(vptree_tests PRIVATE include external/vptree external/psudb-common/cpp/include) diff --git a/include/query/rangecount.h b/include/query/rangecount.h new file mode 100644 index 0000000..7d88b1d --- /dev/null +++ b/include/query/rangecount.h @@ -0,0 +1,169 @@ +/* + * include/query/rangecount.h + * + * Copyright (C) 2023 Douglas B. Rumbaugh + * + * Distributed under the Modified BSD License. + * + * A query class for single dimensional range count queries. This query + * requires that the shard support get_lower_bound(key) and + * get_record_at(index). + */ +#pragma once + +#include "framework/interface/Record.h" +#include "framework/interface/Shard.h" +#include "framework/structure/BufferView.h" +#include "psu-ds/PriorityQueue.h" +#include "util/Cursor.h" + +namespace de { namespace rc { + +template +struct Parms { + decltype(R::key) lower_bound; + decltype(R::key) upper_bound; +}; + +template +struct State { + size_t start_idx; + size_t stop_idx; +}; + +template +struct BufferState { + BufferView buffer; + + BufferState(BufferView buffer) + : buffer(std::move(buffer)) {} +}; + +template +class Query { +public: + constexpr static bool EARLY_ABORT=false; + constexpr static bool SKIP_DELETE_FILTER=true; + + static void *get_query_state(S *shard, void *parms) { + auto res = new State(); + auto p = (Parms *) parms; + + res->start_idx = shard->get_lower_bound(p->lower_bound); + res->stop_idx = shard->get_record_count(); + + return res; + } + + static void* get_buffer_query_state(BufferView buffer, void *parms) { + auto res = new BufferState(std::move(buffer)); + + return res; + } + + static void process_query_states(void *query_parms, std::vector &shard_states, void* buffer_states) { + return; + } + + static std::vector> query(S *shard, void *q_state, void *parms) { + std::vector> records; + auto p = (Parms *) parms; + auto s = (State *) q_state; + + size_t reccnt = 0; + size_t tscnt = 0; + + Wrapped res; + res.rec.key= 0; // records + res.rec.value = 0; // tombstones + records.emplace_back(res); + + /* + * if the returned index is one past the end of the + * records for the PGM, then there are not records + * in the index falling into the specified range. + */ + if (s->start_idx == shard->get_record_count()) { + return records; + } + + auto ptr = shard->get_record_at(s->start_idx); + + /* + * roll the pointer forward to the first record that is + * greater than or equal to the lower bound. + */ + while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { + ptr++; + } + + while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { + if (!ptr->is_deleted()) { + if (ptr->is_tombstone()) { + records[0].rec.value++; + } else { + records[0].rec.key++; + } + } + + ptr++; + } + + return records; + } + + static std::vector> buffer_query(void *state, void *parms) { + auto p = (Parms *) parms; + auto s = (BufferState *) state; + + std::vector> records; + + Wrapped res; + res.rec.key= 0; // records + res.rec.value = 0; // tombstones + records.emplace_back(res); + + for (size_t i=0; ibuffer.get_record_count(); i++) { + auto rec = s->buffer.get(i); + if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound + && !rec->is_deleted()) { + if (rec->is_tombstone()) { + records[0].rec.value++; + } else { + records[0].rec.key++; + } + } + } + + return records; + } + + static std::vector merge(std::vector>> &results, void *parms) { + + R res; + res.key = 0; + res.value = 0; + std::vector output; + output.emplace_back(res); + + for (size_t i=0; i *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (BufferState *) state; + delete s; + } +}; + +}} diff --git a/tests/include/rangecount.h b/tests/include/rangecount.h new file mode 100644 index 0000000..83bf4d4 --- /dev/null +++ b/tests/include/rangecount.h @@ -0,0 +1,155 @@ +/* + * tests/include/rangecount.h + * + * Standardized unit tests for range queries against supporting + * shard types + * + * Copyright (C) 2023 Douglas Rumbaugh + * + * Distributed under the Modified BSD License. + * + * WARNING: This file must be included in the main unit test set + * after the definition of an appropriate Shard and Rec + * type. In particular, Rec needs to implement the key-value + * pair interface and Shard needs to support lower_bound. + * For other types of record and shard, you'll need to + * use a different set of unit tests. + */ +#pragma once + +/* + * Uncomment these lines temporarily to remove errors in this file + * temporarily for development purposes. They should be removed prior + * to building, to ensure no duplicate definitions. These includes/defines + * should be included in the source file that includes this one, above the + * include statement. + */ +//#include "shard/ISAMTree.h" +//#include "query/rangecount.h" +//#include "testing.h" +//#include +//using namespace de; +//typedef ISAMTree Shard; + +START_TEST(t_range_count) +{ + auto buffer = create_sequential_mbuffer(100, 1000); + auto shard = Shard(buffer->get_buffer_view()); + + rc::Parms parms; + parms.lower_bound = 300; + parms.upper_bound = 500; + + auto state = rc::Query::get_query_state(&shard, &parms); + auto result = rc::Query::query(&shard, state, &parms); + rc::Query::delete_query_state(state); + + ck_assert_int_eq(result.size(), 1); + ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1); + + delete buffer; +} +END_TEST + + +START_TEST(t_buffer_range_count) +{ + auto buffer = create_sequential_mbuffer(100, 1000); + + rc::Parms parms; + parms.lower_bound = 300; + parms.upper_bound = 500; + + auto state = rc::Query::get_buffer_query_state(buffer->get_buffer_view(), &parms); + auto result = rc::Query::buffer_query(state, &parms); + rc::Query::delete_buffer_query_state(state); + + ck_assert_int_eq(result.size(), 1); + ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1); + + delete buffer; +} +END_TEST + + +START_TEST(t_range_count_merge) +{ + auto buffer1 = create_sequential_mbuffer(100, 200); + auto buffer2 = create_sequential_mbuffer(400, 1000); + + auto shard1 = Shard(buffer1->get_buffer_view()); + auto shard2 = Shard(buffer2->get_buffer_view()); + + rc::Parms parms; + parms.lower_bound = 150; + parms.upper_bound = 500; + + size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200; + + auto state1 = rc::Query::get_query_state(&shard1, &parms); + auto state2 = rc::Query::get_query_state(&shard2, &parms); + + std::vector>> results(2); + results[0] = rc::Query::query(&shard1, state1, &parms); + results[1] = rc::Query::query(&shard2, state2, &parms); + + rc::Query::delete_query_state(state1); + rc::Query::delete_query_state(state2); + + ck_assert_int_eq(results[0].size(), 1); + ck_assert_int_eq(results[1].size(), 1); + + auto result = rc::Query::merge(results, nullptr); + + ck_assert_int_eq(result[0].key, result_size); + + delete buffer1; + delete buffer2; +} +END_TEST + + +START_TEST(t_lower_bound) +{ + auto buffer1 = create_sequential_mbuffer(100, 200); + auto buffer2 = create_sequential_mbuffer(400, 1000); + + auto shard1 = new Shard(buffer1->get_buffer_view()); + auto shard2 = new Shard(buffer2->get_buffer_view()); + + std::vector shards = {shard1, shard2}; + + auto merged = Shard(shards); + + for (size_t i=100; i<1000; i++) { + Rec r; + r.key = i; + r.value = i; + + auto idx = merged.get_lower_bound(i); + + assert(idx < merged.get_record_count()); + + auto res = merged.get_record_at(idx); + + if (i >=200 && i <400) { + ck_assert_int_lt(res->rec.key, i); + } else { + ck_assert_int_eq(res->rec.key, i); + } + } + + delete buffer1; + delete buffer2; + delete shard1; + delete shard2; +} +END_TEST + +static void inject_rangecount_tests(Suite *suite) { + TCase *range_count = tcase_create("Range Query Testing"); + tcase_add_test(range_count, t_range_count); + tcase_add_test(range_count, t_buffer_range_count); + tcase_add_test(range_count, t_range_count_merge); + suite_add_tcase(suite, range_count); +} diff --git a/tests/rangecount_tests.cpp b/tests/rangecount_tests.cpp new file mode 100644 index 0000000..fe3a587 --- /dev/null +++ b/tests/rangecount_tests.cpp @@ -0,0 +1,55 @@ +/* + * tests/rangequery_tests.cpp + * + * Unit tests for Range Queries across several different + * shards + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * Distributed under the Modified BSD License. + * + */ + +#include "shard/ISAMTree.h" +#include "query/rangecount.h" +#include "include/testing.h" + +#include + +using namespace de; + +typedef ISAMTree Shard; + +#include "include/rangecount.h" + + +Suite *unit_testing() +{ + Suite *unit = suite_create("Range Count Query Testing"); + inject_rangecount_tests(unit); + + return unit; +} + + +int shard_unit_tests() +{ + int failed = 0; + Suite *unit = unit_testing(); + SRunner *unit_shardner = srunner_create(unit); + + srunner_run_all(unit_shardner, CK_NORMAL); + failed = srunner_ntests_failed(unit_shardner); + srunner_free(unit_shardner); + + return failed; +} + + +int main() +{ + int unit_failed = shard_unit_tests(); + + return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +} -- cgit v1.2.3