summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-07-03 15:19:20 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-07-03 15:19:20 -0400
commitf39e512ae3848abd1d3c68349c1a8fbe97be91b5 (patch)
treeaa95fdf6e496556ef7490909de1191a3f81a5584
parent8ecf203a77a897b25af084ceefd82023bfcc1c35 (diff)
downloaddynamic-extension-f39e512ae3848abd1d3c68349c1a8fbe97be91b5.tar.gz
Fixed query errors
-rw-r--r--include/framework/DynamicExtension.h2
-rw-r--r--include/shard/PGM.h22
-rw-r--r--tests/pgm_tests.cpp55
3 files changed, 64 insertions, 15 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 544cd8f..242db8e 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -130,7 +130,7 @@ public:
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i] = filter_deletes(shard_results, shards[i].first, buffer);
+ query_results[i+1] = filter_deletes(shard_results, shards[i].first, buffer);
}
// Merge the results together
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 1622d7e..f3f5106 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -226,7 +226,6 @@ public:
return m_pgm.size_in_bytes();
}
-private:
size_t get_lower_bound(const K& key) const {
auto bound = m_pgm.search(key);
size_t idx = bound.lo;
@@ -257,9 +256,14 @@ private:
}
+ if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) {
+ return idx-1;
+ }
+
return (m_data[idx].rec.key >= key) ? idx : m_reccnt;
}
+private:
Wrapped<R>* m_data;
size_t m_reccnt;
size_t m_tombstone_cnt;
@@ -307,10 +311,16 @@ public:
}
auto ptr = ts->get_record_at(s->start_idx);
- size_t i = 0;
- while (ptr[i].rec.key <= p->upper_bound && i < s->stop_idx - s->start_idx) {
- records.emplace_back(ptr[i]);
- i++;
+
+ // roll the pointer forward to the first record that is
+ // greater than or equal to the lower bound.
+ while(ptr->rec.key < p->lower_bound) {
+ ptr++;
+ }
+
+ while (ptr->rec.key <= p->upper_bound && ptr < ptr + s->stop_idx) {
+ records.emplace_back(*ptr);
+ ptr++;
}
return records;
@@ -326,10 +336,8 @@ public:
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
records.emplace_back(*rec);
}
-
}
-
return records;
}
diff --git a/tests/pgm_tests.cpp b/tests/pgm_tests.cpp
index d21980a..083b370 100644
--- a/tests/pgm_tests.cpp
+++ b/tests/pgm_tests.cpp
@@ -195,9 +195,11 @@ START_TEST(t_range_query_merge)
auto shard2 = Shard(buffer2);
pgm_range_query_parms<Rec> parms;
- parms.lower_bound = 300;
+ parms.lower_bound = 150;
parms.upper_bound = 500;
+ size_t result_size = parms.upper_bound - parms.lower_bound + 1 - 200;
+
auto state1 = PGMRangeQuery<Rec>::get_query_state(&shard1, &parms);
auto state2 = PGMRangeQuery<Rec>::get_query_state(&shard2, &parms);
@@ -208,27 +210,65 @@ START_TEST(t_range_query_merge)
PGMRangeQuery<Rec>::delete_query_state(state1);
PGMRangeQuery<Rec>::delete_query_state(state2);
- ck_assert_int_eq(results[0].size() + results[1].size(), 101);
+ ck_assert_int_eq(results[0].size() + results[1].size(), result_size);
std::vector<std::vector<Rec>> proc_results;
- auto key = 400;
for (size_t j=0; j<results.size(); j++) {
proc_results.emplace_back(std::vector<Rec>());
for (size_t i=0; i<results[j].size(); i++) {
proc_results[j].emplace_back(results[j][i].rec);
- ck_assert_int_eq(results[j][i].rec.key, key);
- key++;
}
}
auto result = PGMRangeQuery<Rec>::merge(proc_results);
std::sort(result.begin(), result.end());
- ck_assert_int_eq(result.size(), 101);
- key = 400;
+ ck_assert_int_eq(result.size(), result_size);
+ auto key = parms.lower_bound;
for (size_t i=0; i<result.size(); i++) {
ck_assert_int_eq(key++, result[i].key);
+ if (key == 200) {
+ key = 400;
+ }
+ }
+
+ delete buffer1;
+ delete buffer2;
+}
+END_TEST
+
+START_TEST(t_lower_bound)
+{
+ auto buffer1 = create_sequential_mbuffer<Rec>(100, 200);
+ auto buffer2 = create_sequential_mbuffer<Rec>(400, 1000);
+
+ de::PGM<Rec> *shards[2];
+
+ auto shard1 = Shard(buffer1);
+ auto shard2 = Shard(buffer2);
+
+ shards[0] = &shard1;
+ shards[1] = &shard2;
+
+ auto merged = Shard(shards, 2);
+
+ for (size_t i=100; i<1000; i++) {
+ Rec r;
+ r.key = i;
+ r.value = i;
+
+ auto idx = merged.get_lower_bound(i);
+
+ assert(idx < merged.get_record_count());
+
+ auto res = merged.get_record_at(idx);
+
+ if (i >=200 && i <400) {
+ ck_assert_int_lt(res->rec.key, i);
+ } else {
+ ck_assert_int_eq(res->rec.key, i);
+ }
}
delete buffer1;
@@ -286,6 +326,7 @@ Suite *unit_testing()
TCase *lookup = tcase_create("de:PGM:point_lookup Testing");
tcase_add_test(lookup, t_point_lookup);
tcase_add_test(lookup, t_point_lookup_miss);
+ tcase_add_test(lookup, t_lower_bound);
suite_add_tcase(unit, lookup);
TCase *range_query = tcase_create("de:PGM::range_query Testing");