summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-06-05 09:37:55 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-06-05 09:50:50 -0400
commit724f6d759979f9e207528cb89139c0a19269663a (patch)
tree1263a54fb81aef586846aaf4f2a86fd6c979fbfa
parentcf6761544e9af62de260041e9c2a0e1f5e3bcdd5 (diff)
downloaddynamic-extension-724f6d759979f9e207528cb89139c0a19269663a.tar.gz
Implemented initial query class for MemISAM
-rw-r--r--include/shard/MemISAM.h210
1 files changed, 156 insertions, 54 deletions
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h
index 7bfbd3f..f7e9f51 100644
--- a/include/shard/MemISAM.h
+++ b/include/shard/MemISAM.h
@@ -25,9 +25,35 @@ namespace de {
thread_local size_t mrun_cancelations = 0;
template <RecordInterface R>
+struct irs_query_parms {
+ decltype(R::key) lower_bound;
+ decltype(R::key) upper_bound;
+ size_t sample_size;
+ gsl_rng *rng;
+};
+
+template <RecordInterface R, bool Rejection>
+class IRSQuery;
+
+template <RecordInterface R>
+struct IRSState {
+ size_t lower_bound;
+ size_t upper_bound;
+};
+
+template <RecordInterface R>
+struct IRSBufferState {
+ size_t cutoff;
+ std::vector<Wrapped<R>> records;
+};
+
+
+template <RecordInterface R>
class MemISAM {
private:
friend class InternalLevel;
+ friend class IRSQuery<R, true>;
+ friend class IRSQuery<R, false>;
typedef decltype(R::key) K;
typedef decltype(R::value) V;
@@ -97,8 +123,6 @@ public:
}
TIMER_STOP();
auto level_time = TIMER_RESULT();
-
- //fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time);
}
MemISAM(MemISAM** runs, size_t len)
@@ -168,59 +192,46 @@ public:
if (m_bf) delete m_bf;
}
- R* sorted_output() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
-
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
- }
-
- R *point_lookup(const R &rec, bool filter) {
-
+ Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
if (filter && !m_bf->lookup(rec.key)) {
return nullptr;
}
size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
- return false;
+ return nullptr;
}
- while (idx < m_reccnt && m_data[idx] < rec) ++idx;
+ while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
- if (m_data[idx] == rec) {
+ if (m_data[idx].rec == rec) {
return m_data + idx;
}
return nullptr;
}
- bool delete_record(const K& key, const V& val) {
- size_t idx = get_lower_bound(key);
- if (idx >= m_reccnt) {
- return false;
- }
-
- while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx;
-
- if (m_data[idx] == R {key, val}) {
- m_data[idx].set_delete_status();
- m_deleted_cnt++;
- return true;
- }
+ R* get_data() const {
+ return m_data;
+ }
+
+ size_t get_record_count() const {
+ return m_reccnt;
+ }
- return false;
+ size_t get_tombstone_count() const {
+ return m_tombstone_cnt;
}
const R* get_record_at(size_t idx) const {
return (idx < m_reccnt) ? m_data + idx : nullptr;
}
+ size_t get_memory_usage() {
+ return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size;
+ }
+
+private:
size_t get_lower_bound(const K& key) const {
const InMemISAMNode* now = m_root;
while (!is_leaf(reinterpret_cast<const char*>(now))) {
@@ -261,27 +272,6 @@ public:
return pos - m_data;
}
- bool check_tombstone(const R& rec) const{
- if (!m_bf->lookup(rec.key)) {
- return false;
- }
-
- size_t idx = get_lower_bound(key);
- if (idx >= m_reccnt) {
- return false;
- }
-
- R* ptr = m_data + idx;
-
- while (ptr < m_data + m_reccnt && ptr->lt(key, val)) ptr++;
- return *ptr == R {key, val} && ptr->is_tombstone();
- }
-
- size_t get_memory_usage() {
- return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size;
- }
-
-private:
void build_internal_levels() {
size_t n_leaf_nodes = m_reccnt / inmem_isam_leaf_fanout + (m_reccnt % inmem_isam_leaf_fanout != 0);
size_t level_node_cnt = n_leaf_nodes;
@@ -357,4 +347,116 @@ private:
size_t m_deleted_cnt;
};
+template <WeightedRecordInterface R, bool Rejection=true>
+class WIRSQuery {
+public:
+
+ static void *get_query_state(MemISAM<R> *isam, void *parms) {
+ auto res = new IRSState<R>();
+ decltype(R::key) lower_key = ((irs_query_parms<R> *) parms)->lower_bound;
+ decltype(R::key) upper_key = ((irs_query_parms<R> *) parms)->upper_bound;
+
+ res->lower_bound = isam->get_lower_bound(lower_key);
+ res->upper_bound = isam->get_upper_bound(upper_key);
+
+ return res;
+ }
+
+ static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
+ auto res = new IRSBufferState<R>();
+
+ res->cutoff = buffer->get_record_count();
+
+ if constexpr (Rejection) {
+ return res;
+ }
+
+ auto lower_key = ((irs_query_parms<R> *) parms)->lower_bound;
+ auto upper_key = ((irs_query_parms<R> *) parms)->upper_bound;
+
+ for (size_t i=0; i<res->cutoff; i++) {
+ if (((buffer->get_data() + i)->key >= lower_key) && ((buffer->get_data() + i) <= upper_key)) {
+ res->records.emplace_back(*(buffer->get_data() + i));
+ }
+ }
+
+ return res;
+ }
+
+ static std::vector<Wrapped<R>> query(MemISAM<R> *isam, void *q_state, void *parms) {
+ auto sample_sz = ((irs_query_parms<R> *) parms)->sample_size;
+ auto lower_key = ((irs_query_parms<R> *) parms)->lower_bound;
+ auto upper_key = ((irs_query_parms<R> *) parms)->upper_bound;
+ auto rng = ((irs_query_parms<R> *) parms)->rng;
+
+ auto state = (IRSState<R> *) q_state;
+
+ std::vector<Wrapped<R>> result_set;
+
+ if (sample_sz == 0) {
+ return result_set;
+ }
+
+ size_t attempts = 0;
+ size_t range_length = state->upper_bound - state->lower_bound;
+ do {
+ attempts++;
+ size_t idx = gsl_rng_uniform_int(rng, range_length);
+ result_set.emplace_back(isam->get_record_at(idx));
+ } while (attempts < sample_sz);
+
+ return result_set;
+ }
+
+ static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ auto st = (IRSBufferState<R> *) state;
+ auto p = (irs_query_parms<R> *) parms;
+
+ std::vector<Wrapped<R>> result;
+ result.reserve(p->sample_size);
+
+ if constexpr (Rejection) {
+ for (size_t i=0; i<p->sample_size; i++) {
+ auto idx = gsl_rng_uniform_int(p->rng, st->cutoff);
+ auto rec = buffer->get_data() + idx;
+
+ if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
+ result.emplace_back(*rec);
+ }
+ }
+
+ return result;
+ }
+
+ for (size_t i=0; i<p->sample_size; i++) {
+ auto idx = gsl_rng_uniform_int(p->rng, st->records.size());
+ result.emplace_back(st->records[idx]);
+ }
+
+ return result;
+ }
+
+ static std::vector<R> merge(std::vector<std::vector<R>> &results) {
+ std::vector<R> output;
+
+ for (size_t i=0; i<results.size(); i++) {
+ for (size_t j=0; j<results[i].size(); j++) {
+ output.emplace_back(results[i][j]);
+ }
+ }
+
+ return output;
+ }
+
+ static void delete_query_state(void *state) {
+ auto s = (IRSState<R> *) state;
+ delete s;
+ }
+
+ static void delete_buffer_query_state(void *state) {
+ auto s = (IRSBufferState<R> *) state;
+ delete s;
+ }
+};
+
}