summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-22 10:14:05 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-22 10:15:46 -0500
commit4ac2e14d24a1fdd3f9bf777775b16bf6a677f487 (patch)
tree2faee0bb55d67a6bda4a789e2d04436e118e598f
parent38693c342558628c75e0ab0d23c32a95a499ed8b (diff)
downloaddynamic-extension-4ac2e14d24a1fdd3f9bf777775b16bf6a677f487.tar.gz
Added RangeCount query
-rw-r--r--CMakeLists.txt7
-rw-r--r--include/query/rangecount.h169
-rw-r--r--tests/include/rangecount.h155
-rw-r--r--tests/rangecount_tests.cpp55
4 files changed, 386 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6e286d7..a051361 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -52,6 +52,13 @@ if (tests)
target_link_options(rangequery_tests PUBLIC -mcx16)
target_include_directories(rangequery_tests PRIVATE include external/psudb-common/cpp/include)
+
+ add_executable(rangecount_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/rangecount_tests.cpp)
+ target_link_libraries(rangecount_tests PUBLIC gsl check subunit pthread atomic)
+ target_link_options(rangecount_tests PUBLIC -mcx16)
+ target_include_directories(rangecount_tests PRIVATE include external/psudb-common/cpp/include)
+
+
#add_executable(vptree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/vptree_tests.cpp)
#target_link_libraries(vptree_tests PUBLIC gsl check subunit pthread)
#target_include_directories(vptree_tests PRIVATE include external/vptree external/psudb-common/cpp/include)
diff --git a/include/query/rangecount.h b/include/query/rangecount.h
new file mode 100644
index 0000000..7d88b1d
--- /dev/null
+++ b/include/query/rangecount.h
@@ -0,0 +1,169 @@
+/*
+ * include/query/rangecount.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A query class for single dimensional range count queries. This query
+ * requires that the shard support get_lower_bound(key) and
+ * get_record_at(index).
+ */
+#pragma once
+
+#include "framework/interface/Record.h"
+#include "framework/interface/Shard.h"
+#include "framework/structure/BufferView.h"
+#include "psu-ds/PriorityQueue.h"
+#include "util/Cursor.h"
+
+namespace de { namespace rc {
+
+template <RecordInterface R>
+struct Parms {
+ decltype(R::key) lower_bound;
+ decltype(R::key) upper_bound;
+};
+
+template <RecordInterface R>
+struct State {
+ size_t start_idx;
+ size_t stop_idx;
+};
+
+template <RecordInterface R>
+struct BufferState {
+ BufferView<R> buffer;
+
+ BufferState(BufferView<R> buffer)
+ : buffer(std::move(buffer)) {}
+};
+
+template <ShardInterface S, KVPInterface R>
+class Query {
+public:
+ constexpr static bool EARLY_ABORT=false;
+ constexpr static bool SKIP_DELETE_FILTER=true;
+
+ static void *get_query_state(S *shard, void *parms) {
+ auto res = new State<R>();
+ auto p = (Parms<R> *) parms;
+
+ res->start_idx = shard->get_lower_bound(p->lower_bound);
+ res->stop_idx = shard->get_record_count();
+
+ return res;
+ }
+
+ static void* get_buffer_query_state(BufferView<R> buffer, void *parms) {
+ auto res = new BufferState<R>(std::move(buffer));
+
+ return res;
+ }
+
+ static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_states) {
+ return;
+ }
+
+ static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) {
+ std::vector<Wrapped<R>> records;
+ auto p = (Parms<R> *) parms;
+ auto s = (State<R> *) q_state;
+
+ size_t reccnt = 0;
+ size_t tscnt = 0;
+
+ Wrapped<R> res;
+ res.rec.key= 0; // records
+ res.rec.value = 0; // tombstones
+ records.emplace_back(res);
+
+ /*
+ * if the returned index is one past the end of the
+ * records for the PGM, then there are not records
+ * in the index falling into the specified range.
+ */
+ if (s->start_idx == shard->get_record_count()) {
+ return records;
+ }
+
+ auto ptr = shard->get_record_at(s->start_idx);
+
+ /*
+ * roll the pointer forward to the first record that is
+ * greater than or equal to the lower bound.
+ */
+ while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) {
+ ptr++;
+ }
+
+ while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) {
+ if (!ptr->is_deleted()) {
+ if (ptr->is_tombstone()) {
+ records[0].rec.value++;
+ } else {
+ records[0].rec.key++;
+ }
+ }
+
+ ptr++;
+ }
+
+ return records;
+ }
+
+ static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) {
+ auto p = (Parms<R> *) parms;
+ auto s = (BufferState<R> *) state;
+
+ std::vector<Wrapped<R>> records;
+
+ Wrapped<R> res;
+ res.rec.key= 0; // records
+ res.rec.value = 0; // tombstones
+ records.emplace_back(res);
+
+ for (size_t i=0; i<s->buffer.get_record_count(); i++) {
+ auto rec = s->buffer.get(i);
+ if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound
+ && !rec->is_deleted()) {
+ if (rec->is_tombstone()) {
+ records[0].rec.value++;
+ } else {
+ records[0].rec.key++;
+ }
+ }
+ }
+
+ return records;
+ }
+
+ static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
+
+ R res;
+ res.key = 0;
+ res.value = 0;
+ std::vector<R> output;
+ output.emplace_back(res);
+
+ for (size_t i=0; i<results.size(); i++) {
+ output[0].key += results[i][0].rec.key; // records
+ output[0].value += results[i][0].rec.value; // tombstones
+ }
+
+ output[0].key -= output[0].value;
+ return output;
+ }
+
+ static void delete_query_state(void *state) {
+ auto s = (State<R> *) state;
+ delete s;
+ }
+
+ static void delete_buffer_query_state(void *state) {
+ auto s = (BufferState<R> *) state;
+ delete s;
+ }
+};
+
+}}
diff --git a/tests/include/rangecount.h b/tests/include/rangecount.h
new file mode 100644
index 0000000..83bf4d4
--- /dev/null
+++ b/tests/include/rangecount.h
@@ -0,0 +1,155 @@
+/*
+ * tests/include/rangecount.h
+ *
+ * Standardized unit tests for range queries against supporting
+ * shard types
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * WARNING: This file must be included in the main unit test set
+ * after the definition of an appropriate Shard and Rec
+ * type. In particular, Rec needs to implement the key-value
+ * pair interface and Shard needs to support lower_bound.
+ * For other types of record and shard, you'll need to
+ * use a different set of unit tests.
+ */
+#pragma once
+
+/*
+ * Uncomment these lines temporarily to remove errors in this file
+ * temporarily for development purposes. They should be removed prior
+ * to building, to ensure no duplicate definitions. These includes/defines
+ * should be included in the source file that includes this one, above the
+ * include statement.
+ */
+//#include "shard/ISAMTree.h"
+//#include "query/rangecount.h"
+//#include "testing.h"
+//#include <check.h>
+//using namespace de;
+//typedef ISAMTree<Rec> Shard;
+
+START_TEST(t_range_count)
+{
+ auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+ auto shard = Shard(buffer->get_buffer_view());
+
+ rc::Parms<Rec> parms;
+ parms.lower_bound = 300;
+ parms.upper_bound = 500;
+
+ auto state = rc::Query<Shard, Rec>::get_query_state(&shard, &parms);
+ auto result = rc::Query<Shard, Rec>::query(&shard, state, &parms);
+ rc::Query<Shard, Rec>::delete_query_state(state);
+
+ ck_assert_int_eq(result.size(), 1);
+ ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1);
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_buffer_range_count)
+{
+ auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+
+ rc::Parms<Rec> parms;
+ parms.lower_bound = 300;
+ parms.upper_bound = 500;
+
+ auto state = rc::Query<Shard, Rec>::get_buffer_query_state(buffer->get_buffer_view(), &parms);
+ auto result = rc::Query<Shard, Rec>::buffer_query(state, &parms);
+ rc::Query<Shard, Rec>::delete_buffer_query_state(state);
+
+ ck_assert_int_eq(result.size(), 1);
+ ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1);
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_range_count_merge)
+{
+ auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
+
+ auto shard1 = Shard(buffer1->get_buffer_view());
+ auto shard2 = Shard(buffer2->get_buffer_view());
+
+ rc::Parms<Rec> parms;
+ parms.lower_bound = 150;
+ parms.upper_bound = 500;
+
+ size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200;
+
+ auto state1 = rc::Query<Shard, Rec>::get_query_state(&shard1, &parms);
+ auto state2 = rc::Query<Shard, Rec>::get_query_state(&shard2, &parms);
+
+ std::vector<std::vector<de::Wrapped<Rec>>> results(2);
+ results[0] = rc::Query<Shard, Rec>::query(&shard1, state1, &parms);
+ results[1] = rc::Query<Shard, Rec>::query(&shard2, state2, &parms);
+
+ rc::Query<Shard, Rec>::delete_query_state(state1);
+ rc::Query<Shard, Rec>::delete_query_state(state2);
+
+ ck_assert_int_eq(results[0].size(), 1);
+ ck_assert_int_eq(results[1].size(), 1);
+
+ auto result = rc::Query<Shard, Rec>::merge(results, nullptr);
+
+ ck_assert_int_eq(result[0].key, result_size);
+
+ delete buffer1;
+ delete buffer2;
+}
+END_TEST
+
+
+START_TEST(t_lower_bound)
+{
+ auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
+
+ auto shard1 = new Shard(buffer1->get_buffer_view());
+ auto shard2 = new Shard(buffer2->get_buffer_view());
+
+ std::vector<Shard*> shards = {shard1, shard2};
+
+ auto merged = Shard(shards);
+
+ for (size_t i=100; i<1000; i++) {
+ Rec r;
+ r.key = i;
+ r.value = i;
+
+ auto idx = merged.get_lower_bound(i);
+
+ assert(idx < merged.get_record_count());
+
+ auto res = merged.get_record_at(idx);
+
+ if (i >=200 && i <400) {
+ ck_assert_int_lt(res->rec.key, i);
+ } else {
+ ck_assert_int_eq(res->rec.key, i);
+ }
+ }
+
+ delete buffer1;
+ delete buffer2;
+ delete shard1;
+ delete shard2;
+}
+END_TEST
+
+static void inject_rangecount_tests(Suite *suite) {
+ TCase *range_count = tcase_create("Range Query Testing");
+ tcase_add_test(range_count, t_range_count);
+ tcase_add_test(range_count, t_buffer_range_count);
+ tcase_add_test(range_count, t_range_count_merge);
+ suite_add_tcase(suite, range_count);
+}
diff --git a/tests/rangecount_tests.cpp b/tests/rangecount_tests.cpp
new file mode 100644
index 0000000..fe3a587
--- /dev/null
+++ b/tests/rangecount_tests.cpp
@@ -0,0 +1,55 @@
+/*
+ * tests/rangequery_tests.cpp
+ *
+ * Unit tests for Range Queries across several different
+ * shards
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+
+#include "shard/ISAMTree.h"
+#include "query/rangecount.h"
+#include "include/testing.h"
+
+#include <check.h>
+
+using namespace de;
+
+typedef ISAMTree<Rec> Shard;
+
+#include "include/rangecount.h"
+
+
+Suite *unit_testing()
+{
+ Suite *unit = suite_create("Range Count Query Testing");
+ inject_rangecount_tests(unit);
+
+ return unit;
+}
+
+
+int shard_unit_tests()
+{
+ int failed = 0;
+ Suite *unit = unit_testing();
+ SRunner *unit_shardner = srunner_create(unit);
+
+ srunner_run_all(unit_shardner, CK_NORMAL);
+ failed = srunner_ntests_failed(unit_shardner);
+ srunner_free(unit_shardner);
+
+ return failed;
+}
+
+
+int main()
+{
+ int unit_failed = shard_unit_tests();
+
+ return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}