summaryrefslogtreecommitdiffstats
path: root/include/shard
diff options
context:
space:
mode:
authorDong Xie <dongx@psu.edu>2023-07-28 02:31:52 -0400
committerDong Xie <dongx@psu.edu>2023-07-28 02:31:52 -0400
commitb0dea94f86ba96a7c319764d6b7d920d202ffa7e (patch)
tree0cf79186457a42ece044115d25d4f5fe97c1fec7 /include/shard
parentbd5295f1163839c23d6da0d8f353bfc308e272a4 (diff)
downloaddynamic-extension-b0dea94f86ba96a7c319764d6b7d920d202ffa7e.tar.gz
Change PGM to range count.
Diffstat (limited to 'include/shard')
-rw-r--r--include/shard/PGM.h57
-rw-r--r--include/shard/TrieSpline.h33
2 files changed, 58 insertions, 32 deletions
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index aba5227..af20594 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -76,7 +76,7 @@ public:
m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
std::vector<K> keys;
- m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
+ //m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
size_t offset = 0;
m_reccnt = 0;
@@ -106,11 +106,12 @@ public:
base->header &= 3;
m_data[m_reccnt++] = *base;
keys.emplace_back(base->rec.key);
-
+
+ /*
if (m_bf && base->is_tombstone()) {
m_tombstone_cnt++;
m_bf->insert(base->rec);
- }
+ }*/
base++;
}
@@ -143,7 +144,7 @@ public:
}
}
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ //m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
assert(m_alloc_size % CACHELINE_SIZE == 0);
@@ -167,10 +168,10 @@ public:
if (!cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
keys.emplace_back(cursor.ptr->rec.key);
- if (m_bf && cursor.ptr->is_tombstone()) {
+ /*if (m_bf && cursor.ptr->is_tombstone()) {
++m_tombstone_cnt;
if (m_bf) m_bf->insert(cursor.ptr->rec);
- }
+ }*/
}
pq.pop();
@@ -185,14 +186,14 @@ public:
~PGM() {
if (m_data) free(m_data);
- if (m_bf) delete m_bf;
+ //if (m_bf) delete m_bf;
}
Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if (filter && !m_bf->lookup(rec)) {
- return nullptr;
- }
+ //if (filter && !m_bf->lookup(rec)) {
+ // return nullptr;
+ //}
size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
@@ -275,7 +276,7 @@ private:
K m_max_key;
K m_min_key;
pgm::PGMIndex<K, epsilon> m_pgm;
- BloomFilter<R> *m_bf;
+ //BloomFilter<R> *m_bf;
};
template <RecordInterface R>
class PGMPointLookup {
@@ -349,7 +350,7 @@ template <RecordInterface R>
class PGMRangeQuery {
public:
constexpr static bool EARLY_ABORT=false;
- constexpr static bool SKIP_DELETE_FILTER=false;
+ constexpr static bool SKIP_DELETE_FILTER=false;
static void *get_query_state(PGM<R> *ts, void *parms) {
auto res = new PGMState<R>();
@@ -373,7 +374,8 @@ public:
}
static std::vector<Wrapped<R>> query(PGM<R> *ts, void *q_state, void *parms) {
- std::vector<Wrapped<R>> records;
+ size_t tot = 0;
+ //std::vector<Wrapped<R>> records;
auto p = (pgm_range_query_parms<R> *) parms;
auto s = (PGMState<R> *) q_state;
@@ -381,7 +383,7 @@ public:
// records for the PGM, then there are not records
// in the index falling into the specified range.
if (s->start_idx == ts->get_record_count()) {
- return records;
+ return {};
}
auto ptr = ts->get_record_at(s->start_idx);
@@ -393,30 +395,37 @@ public:
}
while (ptr->rec.key <= p->upper_bound && ptr < ts->m_data + s->stop_idx) {
- records.emplace_back(*ptr);
+ if (ptr->is_tombstone()) --tot;
+ else if (!ptr->is_deleted()) ++tot;
+ //records.emplace_back(*ptr);
ptr++;
}
- return records;
+ return {Wrapped<R>{0, {tot, 0}}};
+ //return records;
}
static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ size_t tot = 0;
auto p = (pgm_range_query_parms<R> *) parms;
auto s = (PGMBufferState<R> *) state;
- std::vector<Wrapped<R>> records;
+ //std::vector<Wrapped<R>> records;
for (size_t i=0; i<s->cutoff; i++) {
auto rec = buffer->get_data() + i;
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
- records.emplace_back(*rec);
+ if (rec->is_tombstone()) --tot;
+ else if (!rec->is_deleted()) ++tot;
+ //records.emplace_back(*rec);
}
}
- return records;
+ return {Wrapped<R>{0, {tot, 0}}};
+ //return records;
}
static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
- std::vector<Cursor<Wrapped<R>>> cursors;
+ /*std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(results.size());
PriorityQueue<Wrapped<R>> pq(results.size());
@@ -460,9 +469,13 @@ public:
if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
}
- }
+ }*/
- return output;
+ size_t tot = 0;
+ for (auto& result: results)
+ if (result.size() > 0) tot += result[0].rec.key;
+
+ return {{tot, 0}};
}
static void delete_query_state(void *state) {
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 7d1a90d..973b684 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -325,7 +325,8 @@ public:
}
static std::vector<Wrapped<R>> query(TrieSpline<R> *ts, void *q_state, void *parms) {
- std::vector<Wrapped<R>> records;
+ //std::vector<Wrapped<R>> records;
+ size_t tot = 0;
auto p = (ts_range_query_parms<R> *) parms;
auto s = (TrieSplineState<R> *) q_state;
@@ -333,7 +334,7 @@ public:
// records for the TrieSpline, then there are not records
// in the index falling into the specified range.
if (s->start_idx == ts->get_record_count()) {
- return records;
+ return {};
}
auto ptr = ts->get_record_at(s->start_idx);
@@ -346,32 +347,39 @@ public:
while (ptr->rec.key <= p->upper_bound && ptr < ts->m_data + s->stop_idx) {
- records.emplace_back(*ptr);
+ if (ptr->is_tombstone()) --tot;
+ else if (!ptr->is_deleted()) ++tot;
+ //records.emplace_back(*ptr);
ptr++;
}
- return records;
+ return {Wrapped<R>{0, {tot, 0}}};
+ //return records;
}
static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ size_t tot = 0;
auto p = (ts_range_query_parms<R> *) parms;
auto s = (TrieSplineBufferState<R> *) state;
- std::vector<Wrapped<R>> records;
+ //std::vector<Wrapped<R>> records;
for (size_t i=0; i<s->cutoff; i++) {
auto rec = buffer->get_data() + i;
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
- records.emplace_back(*rec);
+ if (rec->is_tombstone()) --tot;
+ else if (!rec->is_deleted()) ++tot;
+ //records.emplace_back(*rec);
}
}
-
- return records;
+ return {Wrapped<R>{0, {tot, 0}}};
+ //return records;
}
static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
- std::vector<Cursor<Wrapped<R>>> cursors;
+/*
+ std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(results.size());
PriorityQueue<Wrapped<R>> pq(results.size());
@@ -417,8 +425,13 @@ public:
}
}
- return output;
+ return output;*/
+
+ size_t tot = 0;
+ for (auto& result: results)
+ if (result.size() > 0) tot += result[0].rec.key;
+ return {{tot, 0}};
}
static void delete_query_state(void *state) {