diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-02-05 15:17:25 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-02-05 15:17:25 -0500 |
| commit | db4806d9dd9757273a14e6c3ea92e5a087239145 (patch) | |
| tree | 3766b79180d9d3b2167b0ff8d74cd9e73bfc5298 | |
| parent | fca660859bd8133cff53592b17abf4c8a51fc2c0 (diff) | |
| download | dynamic-extension-db4806d9dd9757273a14e6c3ea92e5a087239145.tar.gz | |
Set up tombstone deletes properly
| -rw-r--r-- | benchmarks/irs_bench.cpp | 18 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 14 | ||||
| -rw-r--r-- | include/query/irs.h | 16 | ||||
| -rw-r--r-- | include/query/rangecount.h | 14 | ||||
| -rw-r--r-- | include/query/rangequery.h | 14 | ||||
| -rw-r--r-- | tests/include/rangecount.h | 14 | ||||
| -rw-r--r-- | tests/include/rangequery.h | 19 | ||||
| -rw-r--r-- | tests/rangequery_tests.cpp | 19 |
8 files changed, 72 insertions, 56 deletions
diff --git a/benchmarks/irs_bench.cpp b/benchmarks/irs_bench.cpp index ffedcf2..6de8681 100644 --- a/benchmarks/irs_bench.cpp +++ b/benchmarks/irs_bench.cpp @@ -20,7 +20,7 @@ typedef de::Record<int64_t, int64_t> Rec; typedef de::ISAMTree<Rec> ISAM; typedef de::irs::Query<ISAM, Rec> Q; -typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext; +typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext; typedef de::irs::Parms<Rec> QP; void run_queries(Ext *extension, std::vector<QP> &queries, gsl_rng *rng) { @@ -30,7 +30,7 @@ void run_queries(Ext *extension, std::vector<QP> &queries, gsl_rng *rng) { q->rng = rng; q->sample_size = 1000; - auto res = extension->query(&q); + auto res = extension->query(q); auto r = res.get(); total += r.size(); } @@ -39,14 +39,14 @@ void run_queries(Ext *extension, std::vector<QP> &queries, gsl_rng *rng) { } size_t g_deleted_records = 0; - -double delete_proportion = 0; +double delete_proportion = 0.05; void insert_records(Ext *extension, size_t start, size_t stop, std::vector<int64_t> &records, std::vector<size_t> &to_delete, size_t &delete_idx, + bool delete_records, gsl_rng *rng) { size_t reccnt = 0; Rec r; @@ -58,7 +58,7 @@ void insert_records(Ext *extension, size_t start, usleep(1); } - if (gsl_rng_uniform(rng) <= delete_proportion && to_delete[delete_idx] <= i) { + if (delete_records && gsl_rng_uniform(rng) <= delete_proportion && to_delete[delete_idx] <= i) { r.key = records[to_delete[delete_idx]]; r.value = (int64_t) (to_delete[delete_idx]); while (!extension->erase(r)) { @@ -95,16 +95,16 @@ int main(int argc, char **argv) { auto queries = read_range_queries<QP>(q_fname, .001); /* warmup structure w/ 10% of records */ - size_t warmup = .1 * n; + size_t warmup = .3 * n; size_t delete_idx = 0; - insert_records(extension, 0, warmup, data, to_delete, delete_idx, rng); + insert_records(extension, 0, warmup, data, to_delete, delete_idx, false, rng); extension->await_next_epoch(); TIMER_INIT(); TIMER_START(); - insert_records(extension, warmup, data.size(), data, to_delete, delete_idx, rng); + insert_records(extension, warmup, data.size(), data, to_delete, delete_idx, true, rng); TIMER_STOP(); auto insert_latency = TIMER_RESULT(); @@ -116,7 +116,7 @@ int main(int argc, char **argv) { auto query_latency = TIMER_RESULT() / queries.size(); - fprintf(stdout, "T\t%ld\t%ld\n", insert_throughput, query_latency); + fprintf(stdout, "T\t%ld\t%ld\t%ld\n", insert_throughput, query_latency, g_deleted_records); gsl_rng_free(rng); delete extension; diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a56cc6c..3e9d0fb 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -481,7 +481,7 @@ private: void *parms = args->query_parms; /* Get the buffer query states */ - void *buffer_state = Q::get_buffer_query_state(std::move(buffer), parms); + void *buffer_state = Q::get_buffer_query_state(&buffer, parms); /* Get the shard query states */ std::vector<std::pair<ShardID, Shard*>> shards; @@ -502,7 +502,7 @@ private: shid = shards[i - 1].first; } - query_results[i] = std::move(filter_deletes(local_results, shid, vers)); + query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); if constexpr (Q::EARLY_ABORT) { if (query_results[i].size() > 0) break; @@ -563,8 +563,8 @@ private: return m_buffer->append(rec, ts); } - static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers) { - if constexpr (!Q::SKIP_DELETE_FILTER) { + static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) { + if constexpr (Q::SKIP_DELETE_FILTER) { return records; } @@ -602,6 +602,12 @@ private: //continue; //} + for (size_t i=0; i<bview->get_record_count(); i++) { + if (bview->get(i)->is_tombstone() && bview->get(i)->rec == rec.rec) { + continue; + } + } + if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) { diff --git a/include/query/irs.h b/include/query/irs.h index 7ef5069..7eea14b 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -39,9 +39,9 @@ struct BufferState { size_t cutoff; std::vector<Wrapped<R>> records; size_t sample_size; - BufferView<R> buffer; + BufferView<R> *buffer; - BufferState(BufferView<R> buffer) : buffer(std::move(buffer)) {} + BufferState(BufferView<R> *buffer) : buffer(buffer) {} }; template <ShardInterface S, RecordInterface R, bool Rejection=true> @@ -68,10 +68,10 @@ public: return res; } - static void* get_buffer_query_state(BufferView<R> buffer, void *parms) { - auto res = new BufferState<R>(std::move(buffer)); + static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { + auto res = new BufferState<R>(buffer); - res->cutoff = res->buffer.get_record_count(); + res->cutoff = res->buffer->get_record_count(); res->sample_size = 0; if constexpr (Rejection) { @@ -82,8 +82,8 @@ public: auto upper_key = ((Parms<R> *) parms)->upper_bound; for (size_t i=0; i<res->cutoff; i++) { - if ((res->buffer.get(i)->rec.key >= lower_key) && (buffer.get(i)->rec.key <= upper_key)) { - res->records.emplace_back(*(res->buffer.get(i))); + if ((res->buffer->get(i)->rec.key >= lower_key) && (buffer->get(i)->rec.key <= upper_key)) { + res->records.emplace_back(*(res->buffer->get(i))); } } @@ -181,7 +181,7 @@ public: if constexpr (Rejection) { for (size_t i=0; i<st->sample_size; i++) { auto idx = gsl_rng_uniform_int(p->rng, st->cutoff); - auto rec = st->buffer.get(idx); + auto rec = st->buffer->get(idx); if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { result.emplace_back(*rec); diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 7d88b1d..70d57d8 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -33,10 +33,10 @@ struct State { template <RecordInterface R> struct BufferState { - BufferView<R> buffer; + BufferView<R> *buffer; - BufferState(BufferView<R> buffer) - : buffer(std::move(buffer)) {} + BufferState(BufferView<R> *buffer) + : buffer(buffer) {} }; template <ShardInterface S, KVPInterface R> @@ -55,8 +55,8 @@ public: return res; } - static void* get_buffer_query_state(BufferView<R> buffer, void *parms) { - auto res = new BufferState<R>(std::move(buffer)); + static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { + auto res = new BufferState<R>(buffer); return res; } @@ -123,8 +123,8 @@ public: res.rec.value = 0; // tombstones records.emplace_back(res); - for (size_t i=0; i<s->buffer.get_record_count(); i++) { - auto rec = s->buffer.get(i); + for (size_t i=0; i<s->buffer->get_record_count(); i++) { + auto rec = s->buffer->get(i); if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound && !rec->is_deleted()) { if (rec->is_tombstone()) { diff --git a/include/query/rangequery.h b/include/query/rangequery.h index c44f5d7..1a42265 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -32,10 +32,10 @@ struct State { template <RecordInterface R> struct BufferState { - BufferView<R> buffer; + BufferView<R> *buffer; - BufferState(BufferView<R> buffer) - : buffer(std::move(buffer)) {} + BufferState(BufferView<R> *buffer) + : buffer(buffer) {} }; template <ShardInterface S, RecordInterface R> @@ -54,8 +54,8 @@ public: return res; } - static void* get_buffer_query_state(BufferView<R> buffer, void *parms) { - auto res = new BufferState<R>(std::move(buffer)); + static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) { + auto res = new BufferState<R>(buffer); return res; } @@ -101,8 +101,8 @@ public: auto s = (BufferState<R> *) state; std::vector<Wrapped<R>> records; - for (size_t i=0; i<s->buffer.get_record_count(); i++) { - auto rec = s->buffer.get(i); + for (size_t i=0; i<s->buffer->get_record_count(); i++) { + auto rec = s->buffer->get(i); if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { records.emplace_back(*rec); } diff --git a/tests/include/rangecount.h b/tests/include/rangecount.h index 83bf4d4..e09ab12 100644 --- a/tests/include/rangecount.h +++ b/tests/include/rangecount.h @@ -33,6 +33,7 @@ START_TEST(t_range_count) { + auto buffer = create_sequential_mbuffer<Rec>(100, 1000); auto shard = Shard(buffer->get_buffer_view()); @@ -60,12 +61,15 @@ START_TEST(t_buffer_range_count) parms.lower_bound = 300; parms.upper_bound = 500; - auto state = rc::Query<Shard, Rec>::get_buffer_query_state(buffer->get_buffer_view(), &parms); - auto result = rc::Query<Shard, Rec>::buffer_query(state, &parms); - rc::Query<Shard, Rec>::delete_buffer_query_state(state); + { + auto view = buffer->get_buffer_view(); + auto state = rc::Query<Shard, Rec>::get_buffer_query_state(&view, &parms); + auto result = rc::Query<Shard, Rec>::buffer_query(state, &parms); + rc::Query<Shard, Rec>::delete_buffer_query_state(state); - ck_assert_int_eq(result.size(), 1); - ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1); + ck_assert_int_eq(result.size(), 1); + ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1); + } delete buffer; } diff --git a/tests/include/rangequery.h b/tests/include/rangequery.h index 1ac0891..b9694a4 100644 --- a/tests/include/rangequery.h +++ b/tests/include/rangequery.h @@ -64,14 +64,17 @@ START_TEST(t_buffer_range_query) parms.lower_bound = 300; parms.upper_bound = 500; - auto state = rq::Query<Shard, Rec>::get_buffer_query_state(buffer->get_buffer_view(), &parms); - auto result = rq::Query<Shard, Rec>::buffer_query(state, &parms); - rq::Query<Shard, Rec>::delete_buffer_query_state(state); - - ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); - for (size_t i=0; i<result.size(); i++) { - ck_assert_int_le(result[i].rec.key, parms.upper_bound); - ck_assert_int_ge(result[i].rec.key, parms.lower_bound); + { + auto view = buffer->get_buffer_view(); + auto state = rq::Query<Shard, Rec>::get_buffer_query_state(&view, &parms); + auto result = rq::Query<Shard, Rec>::buffer_query(state, &parms); + rq::Query<Shard, Rec>::delete_buffer_query_state(state); + + ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); + for (size_t i=0; i<result.size(); i++) { + ck_assert_int_le(result[i].rec.key, parms.upper_bound); + ck_assert_int_ge(result[i].rec.key, parms.lower_bound); + } } delete buffer; diff --git a/tests/rangequery_tests.cpp b/tests/rangequery_tests.cpp index 78a4e72..c78571c 100644 --- a/tests/rangequery_tests.cpp +++ b/tests/rangequery_tests.cpp @@ -53,16 +53,19 @@ START_TEST(t_buffer_range_query) parms.lower_bound = 300; parms.upper_bound = 500; - auto state = rq::Query<Shard, Rec>::get_buffer_query_state(buffer->get_buffer_view(), &parms); - auto result = rq::Query<Shard, Rec>::buffer_query(state, &parms); - rq::Query<Shard, Rec>::delete_buffer_query_state(state); + { + auto view = buffer->get_buffer_view(); + auto state = rq::Query<Shard, Rec>::get_buffer_query_state(&view, &parms); + auto result = rq::Query<Shard, Rec>::buffer_query(state, &parms); + rq::Query<Shard, Rec>::delete_buffer_query_state(state); + + ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); + for (size_t i=0; i<result.size(); i++) { + ck_assert_int_le(result[i].rec.key, parms.upper_bound); + ck_assert_int_ge(result[i].rec.key, parms.lower_bound); + } - ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1); - for (size_t i=0; i<result.size(); i++) { - ck_assert_int_le(result[i].rec.key, parms.upper_bound); - ck_assert_int_ge(result[i].rec.key, parms.lower_bound); } - delete buffer; } END_TEST |