From 724f6d759979f9e207528cb89139c0a19269663a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 5 Jun 2023 09:37:55 -0400 Subject: Implemented initial query class for MemISAM --- include/shard/MemISAM.h | 210 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 156 insertions(+), 54 deletions(-) (limited to 'include/shard/MemISAM.h') diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index 7bfbd3f..f7e9f51 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -24,10 +24,36 @@ namespace de { thread_local size_t mrun_cancelations = 0; +template +struct irs_query_parms { + decltype(R::key) lower_bound; + decltype(R::key) upper_bound; + size_t sample_size; + gsl_rng *rng; +}; + +template +class IRSQuery; + +template +struct IRSState { + size_t lower_bound; + size_t upper_bound; +}; + +template +struct IRSBufferState { + size_t cutoff; + std::vector> records; +}; + + template class MemISAM { private: friend class InternalLevel; + friend class IRSQuery; + friend class IRSQuery; typedef decltype(R::key) K; typedef decltype(R::value) V; @@ -97,8 +123,6 @@ public: } TIMER_STOP(); auto level_time = TIMER_RESULT(); - - //fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); } MemISAM(MemISAM** runs, size_t len) @@ -168,59 +192,46 @@ public: if (m_bf) delete m_bf; } - R* sorted_output() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } - - size_t get_tombstone_count() const { - return m_tombstone_cnt; - } - - R *point_lookup(const R &rec, bool filter) { - + Wrapped *point_lookup(const R &rec, bool filter=false) { if (filter && !m_bf->lookup(rec.key)) { return nullptr; } size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { - return false; + return nullptr; } - while (idx < m_reccnt && m_data[idx] < rec) ++idx; + while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; - if (m_data[idx] == rec) { + if (m_data[idx].rec == rec) { return m_data + idx; } return nullptr; } - bool delete_record(const K& key, const V& val) { - size_t idx = get_lower_bound(key); - if (idx >= m_reccnt) { - return false; - } - - while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx; - - if (m_data[idx] == R {key, val}) { - m_data[idx].set_delete_status(); - m_deleted_cnt++; - return true; - } + R* get_data() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } - return false; + size_t get_tombstone_count() const { + return m_tombstone_cnt; } const R* get_record_at(size_t idx) const { return (idx < m_reccnt) ? m_data + idx : nullptr; } + size_t get_memory_usage() { + return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size; + } + +private: size_t get_lower_bound(const K& key) const { const InMemISAMNode* now = m_root; while (!is_leaf(reinterpret_cast(now))) { @@ -261,27 +272,6 @@ public: return pos - m_data; } - bool check_tombstone(const R& rec) const{ - if (!m_bf->lookup(rec.key)) { - return false; - } - - size_t idx = get_lower_bound(key); - if (idx >= m_reccnt) { - return false; - } - - R* ptr = m_data + idx; - - while (ptr < m_data + m_reccnt && ptr->lt(key, val)) ptr++; - return *ptr == R {key, val} && ptr->is_tombstone(); - } - - size_t get_memory_usage() { - return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size; - } - -private: void build_internal_levels() { size_t n_leaf_nodes = m_reccnt / inmem_isam_leaf_fanout + (m_reccnt % inmem_isam_leaf_fanout != 0); size_t level_node_cnt = n_leaf_nodes; @@ -357,4 +347,116 @@ private: size_t m_deleted_cnt; }; +template +class WIRSQuery { +public: + + static void *get_query_state(MemISAM *isam, void *parms) { + auto res = new IRSState(); + decltype(R::key) lower_key = ((irs_query_parms *) parms)->lower_bound; + decltype(R::key) upper_key = ((irs_query_parms *) parms)->upper_bound; + + res->lower_bound = isam->get_lower_bound(lower_key); + res->upper_bound = isam->get_upper_bound(upper_key); + + return res; + } + + static void* get_buffer_query_state(MutableBuffer *buffer, void *parms) { + auto res = new IRSBufferState(); + + res->cutoff = buffer->get_record_count(); + + if constexpr (Rejection) { + return res; + } + + auto lower_key = ((irs_query_parms *) parms)->lower_bound; + auto upper_key = ((irs_query_parms *) parms)->upper_bound; + + for (size_t i=0; icutoff; i++) { + if (((buffer->get_data() + i)->key >= lower_key) && ((buffer->get_data() + i) <= upper_key)) { + res->records.emplace_back(*(buffer->get_data() + i)); + } + } + + return res; + } + + static std::vector> query(MemISAM *isam, void *q_state, void *parms) { + auto sample_sz = ((irs_query_parms *) parms)->sample_size; + auto lower_key = ((irs_query_parms *) parms)->lower_bound; + auto upper_key = ((irs_query_parms *) parms)->upper_bound; + auto rng = ((irs_query_parms *) parms)->rng; + + auto state = (IRSState *) q_state; + + std::vector> result_set; + + if (sample_sz == 0) { + return result_set; + } + + size_t attempts = 0; + size_t range_length = state->upper_bound - state->lower_bound; + do { + attempts++; + size_t idx = gsl_rng_uniform_int(rng, range_length); + result_set.emplace_back(isam->get_record_at(idx)); + } while (attempts < sample_sz); + + return result_set; + } + + static std::vector> buffer_query(MutableBuffer *buffer, void *state, void *parms) { + auto st = (IRSBufferState *) state; + auto p = (irs_query_parms *) parms; + + std::vector> result; + result.reserve(p->sample_size); + + if constexpr (Rejection) { + for (size_t i=0; isample_size; i++) { + auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); + auto rec = buffer->get_data() + idx; + + if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { + result.emplace_back(*rec); + } + } + + return result; + } + + for (size_t i=0; isample_size; i++) { + auto idx = gsl_rng_uniform_int(p->rng, st->records.size()); + result.emplace_back(st->records[idx]); + } + + return result; + } + + static std::vector merge(std::vector> &results) { + std::vector output; + + for (size_t i=0; i *) state; + delete s; + } + + static void delete_buffer_query_state(void *state) { + auto s = (IRSBufferState *) state; + delete s; + } +}; + } -- cgit v1.2.3