summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-02-05 15:17:25 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-02-05 15:17:25 -0500
commitdb4806d9dd9757273a14e6c3ea92e5a087239145 (patch)
tree3766b79180d9d3b2167b0ff8d74cd9e73bfc5298
parentfca660859bd8133cff53592b17abf4c8a51fc2c0 (diff)
downloaddynamic-extension-db4806d9dd9757273a14e6c3ea92e5a087239145.tar.gz
Set up tombstone deletes properly
-rw-r--r--benchmarks/irs_bench.cpp18
-rw-r--r--include/framework/DynamicExtension.h14
-rw-r--r--include/query/irs.h16
-rw-r--r--include/query/rangecount.h14
-rw-r--r--include/query/rangequery.h14
-rw-r--r--tests/include/rangecount.h14
-rw-r--r--tests/include/rangequery.h19
-rw-r--r--tests/rangequery_tests.cpp19
8 files changed, 72 insertions, 56 deletions
diff --git a/benchmarks/irs_bench.cpp b/benchmarks/irs_bench.cpp
index ffedcf2..6de8681 100644
--- a/benchmarks/irs_bench.cpp
+++ b/benchmarks/irs_bench.cpp
@@ -20,7 +20,7 @@
typedef de::Record<int64_t, int64_t> Rec;
typedef de::ISAMTree<Rec> ISAM;
typedef de::irs::Query<ISAM, Rec> Q;
-typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext;
+typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext;
typedef de::irs::Parms<Rec> QP;
void run_queries(Ext *extension, std::vector<QP> &queries, gsl_rng *rng) {
@@ -30,7 +30,7 @@ void run_queries(Ext *extension, std::vector<QP> &queries, gsl_rng *rng) {
q->rng = rng;
q->sample_size = 1000;
- auto res = extension->query(&q);
+ auto res = extension->query(q);
auto r = res.get();
total += r.size();
}
@@ -39,14 +39,14 @@ void run_queries(Ext *extension, std::vector<QP> &queries, gsl_rng *rng) {
}
size_t g_deleted_records = 0;
-
-double delete_proportion = 0;
+double delete_proportion = 0.05;
void insert_records(Ext *extension, size_t start,
size_t stop,
std::vector<int64_t> &records,
std::vector<size_t> &to_delete,
size_t &delete_idx,
+ bool delete_records,
gsl_rng *rng) {
size_t reccnt = 0;
Rec r;
@@ -58,7 +58,7 @@ void insert_records(Ext *extension, size_t start,
usleep(1);
}
- if (gsl_rng_uniform(rng) <= delete_proportion && to_delete[delete_idx] <= i) {
+ if (delete_records && gsl_rng_uniform(rng) <= delete_proportion && to_delete[delete_idx] <= i) {
r.key = records[to_delete[delete_idx]];
r.value = (int64_t) (to_delete[delete_idx]);
while (!extension->erase(r)) {
@@ -95,16 +95,16 @@ int main(int argc, char **argv) {
auto queries = read_range_queries<QP>(q_fname, .001);
/* warmup structure w/ 10% of records */
- size_t warmup = .1 * n;
+ size_t warmup = .3 * n;
size_t delete_idx = 0;
- insert_records(extension, 0, warmup, data, to_delete, delete_idx, rng);
+ insert_records(extension, 0, warmup, data, to_delete, delete_idx, false, rng);
extension->await_next_epoch();
TIMER_INIT();
TIMER_START();
- insert_records(extension, warmup, data.size(), data, to_delete, delete_idx, rng);
+ insert_records(extension, warmup, data.size(), data, to_delete, delete_idx, true, rng);
TIMER_STOP();
auto insert_latency = TIMER_RESULT();
@@ -116,7 +116,7 @@ int main(int argc, char **argv) {
auto query_latency = TIMER_RESULT() / queries.size();
- fprintf(stdout, "T\t%ld\t%ld\n", insert_throughput, query_latency);
+ fprintf(stdout, "T\t%ld\t%ld\t%ld\n", insert_throughput, query_latency, g_deleted_records);
gsl_rng_free(rng);
delete extension;
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index a56cc6c..3e9d0fb 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -481,7 +481,7 @@ private:
void *parms = args->query_parms;
/* Get the buffer query states */
- void *buffer_state = Q::get_buffer_query_state(std::move(buffer), parms);
+ void *buffer_state = Q::get_buffer_query_state(&buffer, parms);
/* Get the shard query states */
std::vector<std::pair<ShardID, Shard*>> shards;
@@ -502,7 +502,7 @@ private:
shid = shards[i - 1].first;
}
- query_results[i] = std::move(filter_deletes(local_results, shid, vers));
+ query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
if constexpr (Q::EARLY_ABORT) {
if (query_results[i].size() > 0) break;
@@ -563,8 +563,8 @@ private:
return m_buffer->append(rec, ts);
}
- static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers) {
- if constexpr (!Q::SKIP_DELETE_FILTER) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) {
+ if constexpr (Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -602,6 +602,12 @@ private:
//continue;
//}
+ for (size_t i=0; i<bview->get_record_count(); i++) {
+ if (bview->get(i)->is_tombstone() && bview->get(i)->rec == rec.rec) {
+ continue;
+ }
+ }
+
if (shid != INVALID_SHID) {
for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) {
diff --git a/include/query/irs.h b/include/query/irs.h
index 7ef5069..7eea14b 100644
--- a/include/query/irs.h
+++ b/include/query/irs.h
@@ -39,9 +39,9 @@ struct BufferState {
size_t cutoff;
std::vector<Wrapped<R>> records;
size_t sample_size;
- BufferView<R> buffer;
+ BufferView<R> *buffer;
- BufferState(BufferView<R> buffer) : buffer(std::move(buffer)) {}
+ BufferState(BufferView<R> *buffer) : buffer(buffer) {}
};
template <ShardInterface S, RecordInterface R, bool Rejection=true>
@@ -68,10 +68,10 @@ public:
return res;
}
- static void* get_buffer_query_state(BufferView<R> buffer, void *parms) {
- auto res = new BufferState<R>(std::move(buffer));
+ static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
+ auto res = new BufferState<R>(buffer);
- res->cutoff = res->buffer.get_record_count();
+ res->cutoff = res->buffer->get_record_count();
res->sample_size = 0;
if constexpr (Rejection) {
@@ -82,8 +82,8 @@ public:
auto upper_key = ((Parms<R> *) parms)->upper_bound;
for (size_t i=0; i<res->cutoff; i++) {
- if ((res->buffer.get(i)->rec.key >= lower_key) && (buffer.get(i)->rec.key <= upper_key)) {
- res->records.emplace_back(*(res->buffer.get(i)));
+ if ((res->buffer->get(i)->rec.key >= lower_key) && (buffer->get(i)->rec.key <= upper_key)) {
+ res->records.emplace_back(*(res->buffer->get(i)));
}
}
@@ -181,7 +181,7 @@ public:
if constexpr (Rejection) {
for (size_t i=0; i<st->sample_size; i++) {
auto idx = gsl_rng_uniform_int(p->rng, st->cutoff);
- auto rec = st->buffer.get(idx);
+ auto rec = st->buffer->get(idx);
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
result.emplace_back(*rec);
diff --git a/include/query/rangecount.h b/include/query/rangecount.h
index 7d88b1d..70d57d8 100644
--- a/include/query/rangecount.h
+++ b/include/query/rangecount.h
@@ -33,10 +33,10 @@ struct State {
template <RecordInterface R>
struct BufferState {
- BufferView<R> buffer;
+ BufferView<R> *buffer;
- BufferState(BufferView<R> buffer)
- : buffer(std::move(buffer)) {}
+ BufferState(BufferView<R> *buffer)
+ : buffer(buffer) {}
};
template <ShardInterface S, KVPInterface R>
@@ -55,8 +55,8 @@ public:
return res;
}
- static void* get_buffer_query_state(BufferView<R> buffer, void *parms) {
- auto res = new BufferState<R>(std::move(buffer));
+ static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
+ auto res = new BufferState<R>(buffer);
return res;
}
@@ -123,8 +123,8 @@ public:
res.rec.value = 0; // tombstones
records.emplace_back(res);
- for (size_t i=0; i<s->buffer.get_record_count(); i++) {
- auto rec = s->buffer.get(i);
+ for (size_t i=0; i<s->buffer->get_record_count(); i++) {
+ auto rec = s->buffer->get(i);
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound
&& !rec->is_deleted()) {
if (rec->is_tombstone()) {
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index c44f5d7..1a42265 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -32,10 +32,10 @@ struct State {
template <RecordInterface R>
struct BufferState {
- BufferView<R> buffer;
+ BufferView<R> *buffer;
- BufferState(BufferView<R> buffer)
- : buffer(std::move(buffer)) {}
+ BufferState(BufferView<R> *buffer)
+ : buffer(buffer) {}
};
template <ShardInterface S, RecordInterface R>
@@ -54,8 +54,8 @@ public:
return res;
}
- static void* get_buffer_query_state(BufferView<R> buffer, void *parms) {
- auto res = new BufferState<R>(std::move(buffer));
+ static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
+ auto res = new BufferState<R>(buffer);
return res;
}
@@ -101,8 +101,8 @@ public:
auto s = (BufferState<R> *) state;
std::vector<Wrapped<R>> records;
- for (size_t i=0; i<s->buffer.get_record_count(); i++) {
- auto rec = s->buffer.get(i);
+ for (size_t i=0; i<s->buffer->get_record_count(); i++) {
+ auto rec = s->buffer->get(i);
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
records.emplace_back(*rec);
}
diff --git a/tests/include/rangecount.h b/tests/include/rangecount.h
index 83bf4d4..e09ab12 100644
--- a/tests/include/rangecount.h
+++ b/tests/include/rangecount.h
@@ -33,6 +33,7 @@
START_TEST(t_range_count)
{
+
auto buffer = create_sequential_mbuffer<Rec>(100, 1000);
auto shard = Shard(buffer->get_buffer_view());
@@ -60,12 +61,15 @@ START_TEST(t_buffer_range_count)
parms.lower_bound = 300;
parms.upper_bound = 500;
- auto state = rc::Query<Shard, Rec>::get_buffer_query_state(buffer->get_buffer_view(), &parms);
- auto result = rc::Query<Shard, Rec>::buffer_query(state, &parms);
- rc::Query<Shard, Rec>::delete_buffer_query_state(state);
+ {
+ auto view = buffer->get_buffer_view();
+ auto state = rc::Query<Shard, Rec>::get_buffer_query_state(&view, &parms);
+ auto result = rc::Query<Shard, Rec>::buffer_query(state, &parms);
+ rc::Query<Shard, Rec>::delete_buffer_query_state(state);
- ck_assert_int_eq(result.size(), 1);
- ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1);
+ ck_assert_int_eq(result.size(), 1);
+ ck_assert_int_eq(result[0].rec.key, parms.upper_bound - parms.lower_bound + 1);
+ }
delete buffer;
}
diff --git a/tests/include/rangequery.h b/tests/include/rangequery.h
index 1ac0891..b9694a4 100644
--- a/tests/include/rangequery.h
+++ b/tests/include/rangequery.h
@@ -64,14 +64,17 @@ START_TEST(t_buffer_range_query)
parms.lower_bound = 300;
parms.upper_bound = 500;
- auto state = rq::Query<Shard, Rec>::get_buffer_query_state(buffer->get_buffer_view(), &parms);
- auto result = rq::Query<Shard, Rec>::buffer_query(state, &parms);
- rq::Query<Shard, Rec>::delete_buffer_query_state(state);
-
- ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
- for (size_t i=0; i<result.size(); i++) {
- ck_assert_int_le(result[i].rec.key, parms.upper_bound);
- ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ {
+ auto view = buffer->get_buffer_view();
+ auto state = rq::Query<Shard, Rec>::get_buffer_query_state(&view, &parms);
+ auto result = rq::Query<Shard, Rec>::buffer_query(state, &parms);
+ rq::Query<Shard, Rec>::delete_buffer_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_le(result[i].rec.key, parms.upper_bound);
+ ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ }
}
delete buffer;
diff --git a/tests/rangequery_tests.cpp b/tests/rangequery_tests.cpp
index 78a4e72..c78571c 100644
--- a/tests/rangequery_tests.cpp
+++ b/tests/rangequery_tests.cpp
@@ -53,16 +53,19 @@ START_TEST(t_buffer_range_query)
parms.lower_bound = 300;
parms.upper_bound = 500;
- auto state = rq::Query<Shard, Rec>::get_buffer_query_state(buffer->get_buffer_view(), &parms);
- auto result = rq::Query<Shard, Rec>::buffer_query(state, &parms);
- rq::Query<Shard, Rec>::delete_buffer_query_state(state);
+ {
+ auto view = buffer->get_buffer_view();
+ auto state = rq::Query<Shard, Rec>::get_buffer_query_state(&view, &parms);
+ auto result = rq::Query<Shard, Rec>::buffer_query(state, &parms);
+ rq::Query<Shard, Rec>::delete_buffer_query_state(state);
+
+ ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
+ for (size_t i=0; i<result.size(); i++) {
+ ck_assert_int_le(result[i].rec.key, parms.upper_bound);
+ ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
+ }
- ck_assert_int_eq(result.size(), parms.upper_bound - parms.lower_bound + 1);
- for (size_t i=0; i<result.size(); i++) {
- ck_assert_int_le(result[i].rec.key, parms.upper_bound);
- ck_assert_int_ge(result[i].rec.key, parms.lower_bound);
}
-
delete buffer;
}
END_TEST