From b0dea94f86ba96a7c319764d6b7d920d202ffa7e Mon Sep 17 00:00:00 2001 From: Dong Xie Date: Fri, 28 Jul 2023 02:31:52 -0400 Subject: Change PGM to range count. --- include/shard/PGM.h | 57 ++++++++++++++++++++++++++++------------------ include/shard/TrieSpline.h | 33 +++++++++++++++++++-------- 2 files changed, 58 insertions(+), 32 deletions(-) (limited to 'include') diff --git a/include/shard/PGM.h b/include/shard/PGM.h index aba5227..af20594 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -76,7 +76,7 @@ public: m_data = (Wrapped*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); std::vector keys; - m_bf = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); + //m_bf = new BloomFilter(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS); size_t offset = 0; m_reccnt = 0; @@ -106,11 +106,12 @@ public: base->header &= 3; m_data[m_reccnt++] = *base; keys.emplace_back(base->rec.key); - + + /* if (m_bf && base->is_tombstone()) { m_tombstone_cnt++; m_bf->insert(base->rec); - } + }*/ base++; } @@ -143,7 +144,7 @@ public: } } - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); + //m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); m_alloc_size = (attemp_reccnt * sizeof(Wrapped)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped)) % CACHELINE_SIZE); assert(m_alloc_size % CACHELINE_SIZE == 0); @@ -167,10 +168,10 @@ public: if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; keys.emplace_back(cursor.ptr->rec.key); - if (m_bf && cursor.ptr->is_tombstone()) { + /*if (m_bf && cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; if (m_bf) m_bf->insert(cursor.ptr->rec); - } + }*/ } pq.pop(); @@ -185,14 +186,14 @@ public: ~PGM() { if (m_data) free(m_data); - if (m_bf) delete m_bf; + //if (m_bf) delete m_bf; } Wrapped *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { - return nullptr; - } + //if (filter && !m_bf->lookup(rec)) { + // return nullptr; + //} size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { @@ -275,7 +276,7 @@ private: K m_max_key; K m_min_key; pgm::PGMIndex m_pgm; - BloomFilter *m_bf; + //BloomFilter *m_bf; }; template class PGMPointLookup { @@ -349,7 +350,7 @@ template class PGMRangeQuery { public: constexpr static bool EARLY_ABORT=false; - constexpr static bool SKIP_DELETE_FILTER=false; + constexpr static bool SKIP_DELETE_FILTER=false; static void *get_query_state(PGM *ts, void *parms) { auto res = new PGMState(); @@ -373,7 +374,8 @@ public: } static std::vector> query(PGM *ts, void *q_state, void *parms) { - std::vector> records; + size_t tot = 0; + //std::vector> records; auto p = (pgm_range_query_parms *) parms; auto s = (PGMState *) q_state; @@ -381,7 +383,7 @@ public: // records for the PGM, then there are not records // in the index falling into the specified range. if (s->start_idx == ts->get_record_count()) { - return records; + return {}; } auto ptr = ts->get_record_at(s->start_idx); @@ -393,30 +395,37 @@ public: } while (ptr->rec.key <= p->upper_bound && ptr < ts->m_data + s->stop_idx) { - records.emplace_back(*ptr); + if (ptr->is_tombstone()) --tot; + else if (!ptr->is_deleted()) ++tot; + //records.emplace_back(*ptr); ptr++; } - return records; + return {Wrapped{0, {tot, 0}}}; + //return records; } static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + size_t tot = 0; auto p = (pgm_range_query_parms *) parms; auto s = (PGMBufferState *) state; - std::vector> records; + //std::vector> records; for (size_t i=0; icutoff; i++) { auto rec = buffer->get_data() + i; if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { - records.emplace_back(*rec); + if (rec->is_tombstone()) --tot; + else if (!rec->is_deleted()) ++tot; + //records.emplace_back(*rec); } } - return records; + return {Wrapped{0, {tot, 0}}}; + //return records; } static std::vector merge(std::vector>> &results, void *parms) { - std::vector>> cursors; + /*std::vector>> cursors; cursors.reserve(results.size()); PriorityQueue> pq(results.size()); @@ -460,9 +469,13 @@ public: if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); } - } + }*/ - return output; + size_t tot = 0; + for (auto& result: results) + if (result.size() > 0) tot += result[0].rec.key; + + return {{tot, 0}}; } static void delete_query_state(void *state) { diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 7d1a90d..973b684 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -325,7 +325,8 @@ public: } static std::vector> query(TrieSpline *ts, void *q_state, void *parms) { - std::vector> records; + //std::vector> records; + size_t tot = 0; auto p = (ts_range_query_parms *) parms; auto s = (TrieSplineState *) q_state; @@ -333,7 +334,7 @@ public: // records for the TrieSpline, then there are not records // in the index falling into the specified range. if (s->start_idx == ts->get_record_count()) { - return records; + return {}; } auto ptr = ts->get_record_at(s->start_idx); @@ -346,32 +347,39 @@ public: while (ptr->rec.key <= p->upper_bound && ptr < ts->m_data + s->stop_idx) { - records.emplace_back(*ptr); + if (ptr->is_tombstone()) --tot; + else if (!ptr->is_deleted()) ++tot; + //records.emplace_back(*ptr); ptr++; } - return records; + return {Wrapped{0, {tot, 0}}}; + //return records; } static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + size_t tot = 0; auto p = (ts_range_query_parms *) parms; auto s = (TrieSplineBufferState *) state; - std::vector> records; + //std::vector> records; for (size_t i=0; icutoff; i++) { auto rec = buffer->get_data() + i; if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { - records.emplace_back(*rec); + if (rec->is_tombstone()) --tot; + else if (!rec->is_deleted()) ++tot; + //records.emplace_back(*rec); } } - - return records; + return {Wrapped{0, {tot, 0}}}; + //return records; } static std::vector merge(std::vector>> &results, void *parms) { - std::vector>> cursors; +/* + std::vector>> cursors; cursors.reserve(results.size()); PriorityQueue> pq(results.size()); @@ -417,8 +425,13 @@ public: } } - return output; + return output;*/ + + size_t tot = 0; + for (auto& result: results) + if (result.size() > 0) tot += result[0].rec.key; + return {{tot, 0}}; } static void delete_query_state(void *state) { -- cgit v1.2.3