summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-06-05 14:25:19 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-06-05 14:25:19 -0400
commitd47eeea719448f649e93b6a9ec7593b4cb2fb40e (patch)
tree0880f4c5b2c2881adb5dce5afa8474b6ccd54450
parent79b5e0b630ee9f53535fc8469e450024af7439e1 (diff)
downloaddynamic-extension-d47eeea719448f649e93b6a9ec7593b4cb2fb40e.tar.gz
Added TrieSpline and PGM Range queries + tests and bugfixes
-rw-r--r--include/shard/PGM.h84
-rw-r--r--include/shard/TrieSpline.h109
-rw-r--r--tests/pgm_tests.cpp59
-rw-r--r--tests/testing.h21
-rw-r--r--tests/triespline_tests.cpp67
5 files changed, 257 insertions, 83 deletions
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 9fad8d0..f9e1dad 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -28,13 +28,13 @@
namespace de {
template <RecordInterface R>
-struct ts_range_query_parms {
+struct pgm_range_query_parms {
decltype(R::key) lower_bound;
decltype(R::key) upper_bound;
};
-template <RecordInterface R, bool Rejection>
-class PGMLookup;
+template <RecordInterface R>
+class PGMRangeQuery;
template <RecordInterface R>
struct PGMState {
@@ -46,7 +46,6 @@ template <RecordInterface R>
struct PGMBufferState {
size_t cutoff;
Alias* alias;
- decltype(R::weight) max_weight;
~PGMBufferState() {
delete alias;
@@ -63,8 +62,7 @@ private:
public:
// FIXME: there has to be a better way to do this
- friend class PGMLookup<R, true>;
- friend class PGMLookup<R, false>;
+ friend class PGMRangeQuery<R>;
PGM(MutableBuffer<R>* buffer)
: m_reccnt(0), m_tombstone_cnt(0) {
@@ -225,10 +223,6 @@ public:
}
private:
-
- // FIXME: depending upon the size of the returned bound,
- // it may be better to switch between binary search and
- // linear scan.
size_t get_lower_bound(const K& key) const {
auto bound = m_pgm.search(key);
size_t idx = bound.lo;
@@ -237,29 +231,29 @@ private:
return m_reccnt;
}
- // if the found location _is_ the key, we're done.
- if (m_data[idx].rec.key == key) {
- return idx;
- }
-
- // if the found location is larger than the key, we need to
- // move backwards towards the beginning of the array
- if (m_data[idx].rec.key > key) {
- for (ssize_t i=idx; i>=0; i--) {
- if (m_data[i].rec.key < key) {
- return i+1;
- }
+ // If the region to search is less than some pre-specified
+ // amount, perform a linear scan to locate the record.
+ if (bound.hi - bound.lo < 256) {
+ while (idx < bound.hi && m_data[idx].rec.key < key) {
+ idx++;
}
- // otherwise, we move forward towards the end
} else {
- for (size_t i=idx; i<m_reccnt; i++) {
- if (m_data[i].rec.key >= key) {
- return i - 1;
+ // Otherwise, perform a binary search
+ idx = bound.lo;
+ size_t max = bound.hi;
+
+ while (idx < max) {
+ size_t mid = (idx + max) / 2;
+ if (key > m_data[mid].rec.key) {
+ idx = mid + 1;
+ } else {
+ max = mid;
}
}
+
}
- return m_reccnt;
+ return (m_data[idx].rec.key <= key) ? idx : m_reccnt;
}
Wrapped<R>* m_data;
@@ -277,7 +271,7 @@ class PGMRangeQuery {
public:
static void *get_query_state(PGM<R> *ts, void *parms) {
auto res = new PGMState<R>();
- auto p = (ts_range_query_parms<R> *) parms;
+ auto p = (pgm_range_query_parms<R> *) parms;
res->start_idx = ts->get_lower_bound(p->lower_bound);
res->stop_idx = ts->get_record_count();
@@ -287,18 +281,26 @@ public:
static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
auto res = new PGMBufferState<R>();
- res.cutoff = buffer->get_record_count();
+ res->cutoff = buffer->get_record_count();
return res;
}
static std::vector<Wrapped<R>> query(PGM<R> *ts, void *q_state, void *parms) {
std::vector<Wrapped<R>> records;
- auto p = (ts_range_query_parms<R> *) parms;
+ auto p = (pgm_range_query_parms<R> *) parms;
auto s = (PGMState<R> *) q_state;
- auto ptr = ts->get_record_at(s->lower_bound);
+
+ // if the returned index is one past the end of the
+ // records for the PGM, then there are not records
+ // in the index falling into the specified range.
+ if (s->start_idx == ts->get_record_count()) {
+ return records;
+ }
+
+ auto ptr = ts->get_record_at(s->start_idx);
size_t i = 0;
- while (ptr->rec.key <= p->upper_bound && i < s->stop_idx) {
+ while (ptr[i].rec.key <= p->upper_bound && i < s->stop_idx - s->start_idx) {
records.emplace_back(ptr[i]);
i++;
}
@@ -307,11 +309,20 @@ public:
}
static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
- auto p = (ts_range_query_parms<R> *) parms;
+ auto p = (pgm_range_query_parms<R> *) parms;
auto s = (PGMBufferState<R> *) state;
+ std::vector<Wrapped<R>> records;
+ for (size_t i=0; i<s->cutoff; i++) {
+ auto rec = buffer->get_data() + i;
+ if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
+ records.emplace_back(*rec);
+ }
+ }
+
+ return records;
}
static std::vector<R> merge(std::vector<std::vector<R>> &results) {
@@ -335,11 +346,8 @@ public:
auto s = (PGMBufferState<R> *) state;
delete s;
}
-
-
- //{q.get_buffer_query_state(p, p)};
- //{q.buffer_query(p, p)};
-
};
+;
+
}
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 13753a1..fb0ed70 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -30,28 +30,24 @@ namespace de {
size_t g_max_error = 1024;
template <RecordInterface R>
-struct ts_lookup_parms {
- size_t sample_size;
- gsl_rng *rng;
+struct ts_range_query_parms {
+ decltype(R::key) lower_bound;
+ decltype(R::key) upper_bound;
};
-template <RecordInterface R, bool Rejection>
-class TrieSplineLookup;
+template <RecordInterface R>
+class TrieSplineRangeQuery;
template <RecordInterface R>
struct TrieSplineState {
- decltype(R::weight) tot_weight;
-
- TrieSplineState() {
- tot_weight = 0;
- }
+ size_t start_idx;
+ size_t stop_idx;
};
template <RecordInterface R>
struct TrieSplineBufferState {
size_t cutoff;
Alias* alias;
- decltype(R::weight) max_weight;
~TrieSplineBufferState() {
delete alias;
@@ -68,8 +64,7 @@ private:
public:
// FIXME: there has to be a better way to do this
- friend class TrieSplineLookup<R, true>;
- friend class TrieSplineLookup<R, false>;
+ friend class TrieSplineRangeQuery<R>;
TrieSpline(MutableBuffer<R>* buffer)
: m_reccnt(0), m_tombstone_cnt(0) {
@@ -247,9 +242,6 @@ public:
private:
- // FIXME: depending upon the size of the returned bound,
- // it may be better to switch between binary search and
- // linear scan.
size_t get_lower_bound(const K& key) const {
auto bound = m_ts.GetSearchBound(key);
size_t idx = bound.begin;
@@ -258,29 +250,29 @@ private:
return m_reccnt;
}
- // if the found location _is_ the key, we're done.
- if (m_data[idx].rec.key == key) {
- return idx;
- }
-
- // if the found location is larger than the key, we need to
- // move backwards towards the beginning of the array
- if (m_data[idx].rec.key > key) {
- for (ssize_t i=idx; i>=0; i--) {
- if (m_data[i].rec.key < key) {
- return i+1;
- }
+ // If the region to search is less than some pre-specified
+ // amount, perform a linear scan to locate the record.
+ if (bound.end - bound.begin < 256) {
+ while (idx < bound.end && m_data[idx].rec.key < key) {
+ idx++;
}
- // otherwise, we move forward towards the end
} else {
- for (size_t i=idx; i<m_reccnt; i++) {
- if (m_data[i].rec.key >= key) {
- return i - 1;
+ // Otherwise, perform a binary search
+ idx = bound.begin;
+ size_t max = bound.end;
+
+ while (idx < max) {
+ size_t mid = (idx + max) / 2;
+ if (key > m_data[mid].rec.key) {
+ idx = mid + 1;
+ } else {
+ max = mid;
}
}
+
}
- return m_reccnt;
+ return (m_data[idx].rec.key <= key) ? idx : m_reccnt;
}
Wrapped<R>* m_data;
@@ -293,25 +285,63 @@ private:
};
-template <RecordInterface R, bool Rejection=true>
-class TrieSplineLookup {
+template <RecordInterface R>
+class TrieSplineRangeQuery {
public:
- static void *get_query_state(TrieSpline<R> *wss, void *parms) {
+ static void *get_query_state(TrieSpline<R> *ts, void *parms) {
auto res = new TrieSplineState<R>();
+ auto p = (ts_range_query_parms<R> *) parms;
+
+ res->start_idx = ts->get_lower_bound(p->lower_bound);
+ res->stop_idx = ts->get_record_count();
return res;
}
static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
+ auto res = new TrieSplineBufferState<R>();
+ res->cutoff = buffer->get_record_count();
+ return res;
}
- static std::vector<Wrapped<R>> query(TrieSpline<R> *wss, void *q_state, void *parms) {
+ static std::vector<Wrapped<R>> query(TrieSpline<R> *ts, void *q_state, void *parms) {
+ std::vector<Wrapped<R>> records;
+ auto p = (ts_range_query_parms<R> *) parms;
+ auto s = (TrieSplineState<R> *) q_state;
+
+ // if the returned index is one past the end of the
+ // records for the TrieSpline, then there are not records
+ // in the index falling into the specified range.
+ if (s->start_idx == ts->get_record_count()) {
+ return records;
+ }
+
+ auto ptr = ts->get_record_at(s->start_idx);
+ size_t i = 0;
+ while (ptr[i].rec.key <= p->upper_bound && i < s->stop_idx - s->start_idx) {
+ records.emplace_back(ptr[i]);
+ i++;
+ }
+ return records;
}
static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ auto p = (ts_range_query_parms<R> *) parms;
+ auto s = (TrieSplineBufferState<R> *) state;
+
+ std::vector<Wrapped<R>> records;
+ for (size_t i=0; i<s->cutoff; i++) {
+ auto rec = buffer->get_data() + i;
+ if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
+ records.emplace_back(*rec);
+ }
+ }
+
+
+ return records;
}
static std::vector<R> merge(std::vector<std::vector<R>> &results) {
@@ -335,11 +365,6 @@ public:
auto s = (TrieSplineBufferState<R> *) state;
delete s;
}
-
-
- //{q.get_buffer_query_state(p, p)};
- //{q.buffer_query(p, p)};
-
};
}
diff --git a/tests/pgm_tests.cpp b/tests/pgm_tests.cpp
index 33979ae..254de03 100644
--- a/tests/pgm_tests.cpp
+++ b/tests/pgm_tests.cpp
@@ -139,6 +139,60 @@ START_TEST(t_point_lookup_miss)
}
+START_TEST(t_range_query)
+{
+ auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+ auto shard = Shard(buffer);
+
+ pgm_range_query_parms<Rec> parms;
+ parms.lower_bound = 300;
+ parms.upper_bound = 500;
+
+ auto state = PGMRangeQuery<Rec>::get_query_state(&shard, &parms);
+ auto result = PGMRangeQuery<Rec>::query(&shard, state, &parms);
+ PGMRangeQuery<Rec>::delete_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_le(result[i].rec.key, parms.upper_bound);
+ ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ }
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_buffer_range_query)
+{
+ auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+
+ pgm_range_query_parms<Rec> parms;
+ parms.lower_bound = 300;
+ parms.upper_bound = 500;
+
+ auto state = PGMRangeQuery<Rec>::get_buffer_query_state(buffer, &parms);
+ auto result = PGMRangeQuery<Rec>::buffer_query(buffer, state, &parms);
+ PGMRangeQuery<Rec>::delete_buffer_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_le(result[i].rec.key, parms.upper_bound);
+ ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ }
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_range_query_merge)
+{
+
+}
+END_TEST
+
+
START_TEST(t_full_cancelation)
{
size_t n = 100;
@@ -190,6 +244,11 @@ Suite *unit_testing()
tcase_add_test(lookup, t_point_lookup_miss);
suite_add_tcase(unit, lookup);
+ TCase *range_query = tcase_create("de:PGM::range_query Testing");
+ tcase_add_test(range_query, t_range_query);
+ tcase_add_test(range_query, t_buffer_range_query);
+ tcase_add_test(range_query, t_range_query_merge);
+ suite_add_tcase(unit, range_query);
return unit;
}
diff --git a/tests/testing.h b/tests/testing.h
index 0be65e3..4277015 100644
--- a/tests/testing.h
+++ b/tests/testing.h
@@ -96,6 +96,27 @@ static de::MutableBuffer<R> *create_test_mbuffer(size_t cnt)
}
template <de::RecordInterface R>
+static de::MutableBuffer<R> *create_sequential_mbuffer(decltype(R::key) start, decltype(R::key) stop)
+{
+ size_t cnt = stop - start;
+ auto buffer = new de::MutableBuffer<R>(cnt, true, cnt);
+
+ for (size_t i=start; i<stop; i++) {
+ R rec;
+ rec.key = i;
+ rec.value = i;
+
+ if constexpr (de::WeightedRecordInterface<R>) {
+ rec.weight = 1;
+ }
+
+ buffer->append(rec);
+ }
+
+ return buffer;
+}
+
+template <de::RecordInterface R>
static de::MutableBuffer<R> *create_test_mbuffer_tombstones(size_t cnt, size_t ts_cnt)
{
auto buffer = new de::MutableBuffer<R>(cnt, true, ts_cnt);
diff --git a/tests/triespline_tests.cpp b/tests/triespline_tests.cpp
index 982be79..d88d4b1 100644
--- a/tests/triespline_tests.cpp
+++ b/tests/triespline_tests.cpp
@@ -1,5 +1,5 @@
/*
- * tests/irs_tests.cpp
+ * tests/triespline_tests.cpp
*
* Unit tests for TrieSpline (Augmented B+Tree) shard
*
@@ -45,7 +45,7 @@ START_TEST(t_mbuffer_init)
}
-START_TEST(t_irs_init)
+START_TEST(t_init)
{
size_t n = 512;
auto mbuffer1 = create_test_mbuffer<Rec>(n);
@@ -169,13 +169,67 @@ START_TEST(t_full_cancelation)
END_TEST
+START_TEST(t_range_query)
+{
+ auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+ auto shard = Shard(buffer);
+
+ ts_range_query_parms<Rec> parms;
+ parms.lower_bound = 300;
+ parms.upper_bound = 500;
+
+ auto state = TrieSplineRangeQuery<Rec>::get_query_state(&shard, &parms);
+ auto result = TrieSplineRangeQuery<Rec>::query(&shard, state, &parms);
+ TrieSplineRangeQuery<Rec>::delete_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_le(result[i].rec.key, parms.upper_bound);
+ ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ }
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_buffer_range_query)
+{
+ auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
+
+ ts_range_query_parms<Rec> parms;
+ parms.lower_bound = 300;
+ parms.upper_bound = 500;
+
+ auto state = TrieSplineRangeQuery<Rec>::get_buffer_query_state(buffer, &parms);
+ auto result = TrieSplineRangeQuery<Rec>::buffer_query(buffer, state, &parms);
+ TrieSplineRangeQuery<Rec>::delete_buffer_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_le(result[i].rec.key, parms.upper_bound);
+ ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ }
+
+ delete buffer;
+}
+END_TEST
+
+
+START_TEST(t_range_query_merge)
+{
+
+}
+END_TEST
+
+
Suite *unit_testing()
{
Suite *unit = suite_create("TrieSpline Shard Unit Testing");
TCase *create = tcase_create("de::TrieSpline constructor Testing");
tcase_add_test(create, t_mbuffer_init);
- tcase_add_test(create, t_irs_init);
+ tcase_add_test(create, t_init);
tcase_set_timeout(create, 100);
suite_add_tcase(unit, create);
@@ -191,6 +245,13 @@ Suite *unit_testing()
suite_add_tcase(unit, lookup);
+ TCase *range_query = tcase_create("de:TrieSpline::range_query Testing");
+ tcase_add_test(range_query, t_range_query);
+ tcase_add_test(range_query, t_buffer_range_query);
+ tcase_add_test(range_query, t_range_query_merge);
+ suite_add_tcase(unit, range_query);
+
+
return unit;
}