From 138c793b0a58577713d98c98bb140cf1d9c79bee Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 17 Jan 2024 18:22:00 -0500 Subject: Multiple concurrency bug fixes A poorly organized commit with fixes for a variety of bugs that were causing missing records. The core problems all appear to be fixed, though there is an outstanding problem with tombstones not being completely canceled. A very small number are appearing in the wrong order during the static structure test. --- CMakeLists.txt | 7 ++- include/framework/DynamicExtension.h | 91 ++++++++++------------------- include/framework/interface/Shard.h | 5 +- include/framework/scheduling/Epoch.h | 51 ++++++++-------- include/framework/structure/BufferView.h | 2 +- include/framework/structure/InternalLevel.h | 26 ++++----- include/framework/structure/MutableBuffer.h | 58 +++++++++++------- include/query/rangequery.h | 5 +- include/shard/ISAMTree.h | 35 ++++++----- tests/de_tier_concurrent.cpp | 57 ++++++++++++++++++ tests/include/concurrent_extension.h | 54 +++++++++++------ tests/include/rangequery.h | 25 ++++---- tests/include/shard_standard.h | 8 +-- tests/rangequery_tests.cpp | 7 +-- 14 files changed, 246 insertions(+), 185 deletions(-) create mode 100644 tests/de_tier_concurrent.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6f7cf90..e47ca4e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug false) +set(debug true) set(tests True) set(bench true) set(old_bench False) @@ -80,6 +80,11 @@ if (tests) target_link_libraries(de_level_concurrent PUBLIC gsl check subunit pthread atomic) target_link_options(de_level_concurrent PUBLIC -mcx16) target_include_directories(de_level_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) + + add_executable(de_tier_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_concurrent.cpp) + target_link_libraries(de_tier_concurrent PUBLIC gsl check subunit pthread atomic) + target_link_options(de_tier_concurrent PUBLIC -mcx16) + target_include_directories(de_tier_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread atomic) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index cb21ae3..7590de2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -49,7 +49,7 @@ public: , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) { auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(0, vers, m_buffer); + auto epoch = new _Epoch(0, vers, m_buffer, 0); m_versions.insert(vers); m_epochs.insert({0, epoch}); @@ -169,6 +169,15 @@ public: auto vers = epoch->get_structure(); std::vector shards; + + if (vers->get_levels().size() > 0) { + for (int i=vers->get_levels().size() - 1; i>= 0; i--) { + if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { + shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); + } + } + } + /* * construct a shard from the buffer view. We'll hold the view * for as short a time as possible: once the records are exfiltrated @@ -182,24 +191,7 @@ public: } } - if (vers->get_levels().size() > 0) { - for (int i=vers->get_levels().size() - 1; i>= 0; i--) { - if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) { - shards.emplace_back(vers->get_levels()[i]->get_combined_shard()); - } - } - } - - Shard *shards_array[shards.size()]; - - size_t j = 0; - for (size_t i=0; i) { - size_t old_buffer_cnt = new_epoch->clear_buffers(); - - /* - * skip the first buffer, as this was flushed into the epoch's - * structure already, and copy all the other buffer references - * into the new epoch - */ - for (size_t i=1; iget_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers()[i]); - } - } - #endif + // FIXME: this may currently fail because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + auto res = new_epoch->advance_buffer_head(buffer_head); + assert(res); m_current_epoch.fetch_add(1); old_epoch->set_inactive(); @@ -425,40 +405,29 @@ private: } static void reconstruction(void *arguments) { - ReconstructionArgs *args = (ReconstructionArgs *) arguments; - + auto args = (ReconstructionArgs *) arguments; Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; imerges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } - /* - * we'll grab the buffer AFTER doing the internal reconstruction, so we can - * flush as many records as possible in one go. The reconstruction was done so - * as to make room for the full buffer anyway, so there's no real benefit to doing - * this first. + /* + * we'll grab the buffer AFTER doing the internal reconstruction, so we + * can flush as many records as possible in one go. The reconstruction + * was done so as to make room for the full buffer anyway, so there's + * no real benefit to doing this first. */ auto buffer_view = args->epoch->get_buffer(); size_t new_head = buffer_view.get_tail(); - /* - * if performing a compaction, don't flush the buffer, as - * there is no guarantee that any necessary reconstructions + /* + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions * will free sufficient space in L0 to support a flush */ if (!args->compaction) { vers->flush_buffer(std::move(buffer_view)); - - // FIXME: this may currently fail because there isn't any - // query preemption yet. At this point, we'd need to either - // 1) wait for all queries on the old_head to finish - // 2) kill all queries on the old_head - // 3) somehow migrate all queries on the old_head to the new - // version - auto res = args->epoch->advance_buffer_head(new_head); - assert(res); } args->epoch->end_job(); @@ -470,7 +439,7 @@ private: * part of a compaction */ if (!args->compaction) { - ((DynamicExtension *) args->extension)->advance_epoch(); + ((DynamicExtension *) args->extension)->advance_epoch(new_head); } ((DynamicExtension *) args->extension)->m_reconstruction_scheduled = false; diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h index 2357795..8c4db34 100644 --- a/include/framework/interface/Shard.h +++ b/include/framework/interface/Shard.h @@ -12,6 +12,7 @@ #include "util/types.h" #include "framework/interface/Record.h" +#include namespace de { @@ -19,8 +20,8 @@ namespace de { // determining a good way to handle additional template arguments // to get the Record type into play template -concept ShardInterface = requires(S s, S **spp, void *p, bool b, size_t i) { - {S(spp, i)}; +concept ShardInterface = requires(S s, std::vector spp, void *p, bool b, size_t i) { + {S(spp)}; /* {S(mutable buffer)} {s.point_lookup(r, b) } -> std::convertible_to diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index b005ff6..45ee17d 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -32,15 +32,17 @@ public: , m_active_jobs(0) , m_active(true) , m_epoch_number(number) + , m_buffer_head(0) {} - Epoch(size_t number, Structure *structure, Buffer *buff) + Epoch(size_t number, Structure *structure, Buffer *buff, size_t head) : m_buffer(buff) , m_structure(structure) , m_active_jobs(0) , m_active_merge(false) , m_active(true) , m_epoch_number(number) + , m_buffer_head(head) { structure->take_reference(); } @@ -48,22 +50,21 @@ public: ~Epoch() { assert(m_active_jobs.load() == 0); - /* FIXME: this is needed to keep the destructor from - * sometimes locking up here. But there *shouldn't* be - * any threads waiting on this signal at object destruction, - * so something else is going on here that needs looked into + /* FIXME: this is needed to keep the destructor from sometimes locking + * up here. But there *shouldn't* be any threads waiting on this signal + * at object destruction, so something else is going on here that needs + * looked into */ - //m_active_cv.notify_all(); + // m_active_cv.notify_all(); if (m_structure) { m_structure->release_reference(); } } - - /* - * Epochs are *not* copyable or movable. Only one can exist, and all users of - * it work with pointers + /* + * Epochs are *not* copyable or movable. Only one can exist, and all users + * of it work with pointers */ Epoch(const Epoch&) = delete; Epoch(Epoch&&) = delete; @@ -97,23 +98,20 @@ public: } BufView get_buffer() { - return m_buffer->get_buffer_view(); - } - - BufView get_flush_buffer() { - return m_buffer->get_flush_buffer_view(); + return m_buffer->get_buffer_view(m_buffer_head); } - /* - * Returns a new Epoch object that is a copy of this one. The new object will also contain - * a copy of the m_structure, rather than a reference to the same one. The epoch number of - * the new epoch will be set to the provided argument. + * Returns a new Epoch object that is a copy of this one. The new object + * will also contain a copy of the m_structure, rather than a reference to + * the same one. The epoch number of the new epoch will be set to the + * provided argument. */ Epoch *clone(size_t number) { std::unique_lock m_buffer_lock; auto epoch = new Epoch(number); epoch->m_buffer = m_buffer; + epoch->m_buffer_head = m_buffer_head; if (m_structure) { epoch->m_structure = m_structure->copy(); @@ -125,12 +123,10 @@ public: } /* - * Check if a merge can be started from this Epoch. - * At present, without concurrent merging, this simply - * checks if there is currently a scheduled merge based - * on this Epoch. If there is, returns false. If there - * isn't, return true and set a flag indicating that - * there is an active merge. + * Check if a merge can be started from this Epoch. At present, without + * concurrent merging, this simply checks if there is currently a scheduled + * merge based on this Epoch. If there is, returns false. If there isn't, + * return true and set a flag indicating that there is an active merge. */ bool prepare_reconstruction() { auto old = m_active_merge.load(); @@ -176,7 +172,8 @@ public: } bool advance_buffer_head(size_t head) { - return m_buffer->advance_head(head); + m_buffer_head = head; + return m_buffer->advance_head(m_buffer_head); } private: @@ -187,7 +184,6 @@ private: std::mutex m_cv_lock; std::mutex m_buffer_lock; - std::atomic m_active_merge; /* @@ -199,5 +195,6 @@ private: std::atomic m_active_jobs; bool m_active; size_t m_epoch_number; + size_t m_buffer_head; }; } diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 099b7a2..30fffed 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -110,7 +110,7 @@ public: size_t split_idx = m_cap - (m_head % m_cap); memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), split_idx* sizeof(Wrapped)); - memcpy(buffer + split_idx, (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped)); + memcpy(buffer + (split_idx * sizeof(Wrapped)), (std::byte*) m_data, (get_record_count() - split_idx) * sizeof(Wrapped)); } else { memcpy(buffer, (std::byte*) (m_data + (m_head % m_cap)), get_record_count() * sizeof(Wrapped)); } diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index b35cadd..e9874e0 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -51,11 +51,10 @@ public: assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; - Shard* shards[2]; - shards[0] = base_level->m_shards[0].get(); - shards[1] = new_level->m_shards[0].get(); + std::vector shards = {base_level->m_shards[0].get(), + new_level->m_shards[0].get()}; - res->m_shards[0] = std::make_shared(shards, 2); + res->m_shards[0] = std::make_shared(shards); return std::shared_ptr(res); } @@ -75,17 +74,17 @@ public: return; } - Shard *shards[level->m_shard_cnt]; - for (size_t i=0; im_shard_cnt; i++) { - shards[i] = level->m_shards[i].get(); + std::vector shards; + for (auto shard : level->m_shards) { + if (shard) shards.emplace_back(shard.get()); } if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new S(shards, level->m_shard_cnt); + m_pending_shard = new S(shards); return; } - auto tmp = new S(shards, level->m_shard_cnt); + auto tmp = new S(shards); m_shards[m_shard_cnt] = std::shared_ptr(tmp); ++m_shard_cnt; @@ -131,13 +130,12 @@ public: return nullptr; } - Shard *shards[m_shard_cnt]; - - for (size_t i=0; i shards; + for (auto shard : m_shards) { + if (shard) shards.emplace_back(shard.get()); } - return new S(shards, m_shard_cnt); + return new S(shards); } void get_query_states(std::vector> &shards, std::vector& shard_states, void *query_parms) { diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index eeb3dc9..7edde2f 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -133,18 +133,18 @@ public: return m_tombstone_filter->get_memory_usage(); } - BufferView get_buffer_view() { - size_t head = get_head(); + BufferView get_buffer_view(size_t target_head) { + size_t head = get_head(target_head); auto f = std::bind(release_head_reference, (void *) this, head); return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } - BufferView get_flush_buffer_view() { - size_t head = get_head(); + BufferView get_buffer_view() { + size_t head = get_head(m_head.load().head_idx); auto f = std::bind(release_head_reference, (void *) this, head); - return BufferView(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f); + return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } /* @@ -167,23 +167,39 @@ public: buffer_head new_hd = {new_head, 0}; buffer_head cur_hd; - /* move the current head into the old head */ + /* replace current head with new head */ do { - buffer_head cur_hd = m_head.load(); - m_old_head.store(cur_hd); + cur_hd = m_head.load(); } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + /* move the current head into the old head */ + m_old_head.store(cur_hd); + m_active_head_advance.store(false); return true; } - size_t get_head() { + /* + * FIXME: If target_head does not match *either* the old_head or the + * current_head, this routine will loop infinitely. + */ + size_t get_head(size_t target_head) { buffer_head cur_hd, new_hd; + bool head_acquired = false; do { - cur_hd = m_head.load(); - new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; - } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + if (m_old_head.load().head_idx == target_head) { + cur_hd = m_old_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd); + } else if (m_head.load().head_idx == target_head){ + cur_hd = m_head.load(); + cur_hd.head_idx = target_head; + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd); + } + } while(!head_acquired); return new_hd.head_idx; } @@ -254,22 +270,22 @@ private: buffer_head cur_hd, new_hd; do { - if (buffer->m_head.load().head_idx == head) { - cur_hd = buffer->m_head; - assert(cur_hd.refcnt > 0); + if (buffer->m_old_head.load().head_idx == head) { + cur_hd = buffer->m_old_head; + if (cur_hd.refcnt == 0) continue; new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - - if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { + if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { break; } } else { - cur_hd = buffer->m_old_head; - assert(cur_hd.refcnt > 0); + cur_hd = buffer->m_head; + if (cur_hd.refcnt == 0) continue; new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; - if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { + + if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { break; } - } + } _mm_pause(); } while(true); } diff --git a/include/query/rangequery.h b/include/query/rangequery.h index ad5b767..c44f5d7 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -84,11 +84,11 @@ public: * roll the pointer forward to the first record that is * greater than or equal to the lower bound. */ - while(ptr->rec.key < p->lower_bound) { + while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) { ptr++; } - while (ptr->rec.key <= p->upper_bound && ptr < shard->get_data() + s->stop_idx) { + while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) { records.emplace_back(*ptr); ptr++; } @@ -152,6 +152,7 @@ public: } else { auto& cursor = cursors[tmp_n - now.version - 1]; if (!now.data->is_tombstone()) output.push_back(cursor.ptr->rec); + pq.pop(); if (advance_cursor>(cursor)) pq.push(cursor.ptr, now.version); diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index 6b2f6b5..932e767 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -62,10 +62,13 @@ public: { TIMER_INIT(); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), (byte**) &m_data); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + buffer.get_record_count() * + sizeof(Wrapped), + (byte**) &m_data); TIMER_START(); - auto temp_buffer = (Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped)); + auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped)); buffer.copy_to_buffer((byte *) temp_buffer); auto base = temp_buffer; @@ -99,6 +102,7 @@ public: base++; } + TIMER_STOP(); auto copy_time = TIMER_RESULT(); @@ -112,7 +116,7 @@ public: free(temp_buffer); } - ISAMTree(ISAMTree** runs, size_t len) + ISAMTree(std::vector &shards) : m_bf(nullptr) , m_isam_nodes(nullptr) , m_root(nullptr) @@ -124,19 +128,19 @@ public: , m_data(nullptr) { std::vector>> cursors; - cursors.reserve(len); + cursors.reserve(shards.size()); - PriorityQueue> pq(len); + PriorityQueue> pq(shards.size()); size_t attemp_reccnt = 0; size_t tombstone_count = 0; - for (size_t i = 0; i < len; ++i) { - if (runs[i]) { - auto base = runs[i]->get_data(); - cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()}); - attemp_reccnt += runs[i]->get_record_count(); - tombstone_count += runs[i]->get_tombstone_count(); + for (size_t i = 0; i < shards.size(); ++i) { + if (shards[i]) { + auto base = shards[i]->get_data(); + cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()}); + attemp_reccnt += shards[i]->get_record_count(); + tombstone_count += shards[i]->get_tombstone_count(); pq.push(cursors[i].ptr, i); } else { cursors.emplace_back(Cursor>{nullptr, nullptr, 0, 0}); @@ -144,10 +148,9 @@ public: } m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); - - m_alloc_size = (attemp_reccnt * sizeof(Wrapped)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped)) % CACHELINE_SIZE); - assert(m_alloc_size % CACHELINE_SIZE == 0); - m_data = (Wrapped*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size); + m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, + attemp_reccnt * sizeof(Wrapped), + (byte **) &m_data); while (pq.size()) { auto now = pq.peek(); @@ -165,6 +168,8 @@ public: if (!cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; if (cursor.ptr->is_tombstone()) { + //fprintf(stderr, "ISAM: Tombstone from shard %ld next record from shard %ld\n", + //now.version, next.version); ++m_tombstone_cnt; m_bf->insert(cursor.ptr->rec); } diff --git a/tests/de_tier_concurrent.cpp b/tests/de_tier_concurrent.cpp new file mode 100644 index 0000000..9387b21 --- /dev/null +++ b/tests/de_tier_concurrent.cpp @@ -0,0 +1,57 @@ +/* + * tests/de_level_tomb.cpp + * + * Unit tests for Dynamic Extension Framework + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * Distributed under the Modified BSD License. + * + */ +#include +#include +#include + +#include "include/testing.h" +#include "framework/DynamicExtension.h" +#include "shard/ISAMTree.h" +#include "query/rangequery.h" + +#include +using namespace de; + +typedef DynamicExtension, rq::Query, Rec>, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; + +#include "include/concurrent_extension.h" + + +Suite *unit_testing() +{ + Suite *unit = suite_create("DynamicExtension: Tombstone Leveling Testing"); + inject_dynamic_extension_tests(unit); + + return unit; +} + + +int shard_unit_tests() +{ + int failed = 0; + Suite *unit = unit_testing(); + SRunner *unit_shardner = srunner_create(unit); + + srunner_run_all(unit_shardner, CK_NORMAL); + failed = srunner_ntests_failed(unit_shardner); + srunner_free(unit_shardner); + + return failed; +} + + +int main() +{ + int unit_failed = shard_unit_tests(); + + return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h index 86f8e12..a0e71c9 100644 --- a/tests/include/concurrent_extension.h +++ b/tests/include/concurrent_extension.h @@ -28,8 +28,9 @@ #include "shard/ISAMTree.h" #include "query/rangequery.h" #include -using namespace de; -typedef DynamicExtension, rq::Query, Rec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; + +//using namespace de; +//typedef DynamicExtension, rq::Query, Rec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; START_TEST(t_create) @@ -75,7 +76,7 @@ START_TEST(t_debug_insert) for (size_t i=0; i<1000; i++) { Rec r = {key, val}; ck_assert_int_eq(test_de->insert(r), 1); - //ck_assert_int_eq(test_de->get_record_count(), i+1); + ck_assert_int_eq(test_de->get_record_count(), i+1); key++; val++; } @@ -115,14 +116,15 @@ START_TEST(t_insert_with_mem_merges) r.key++; r.value++; cnt++; + ck_assert_int_eq(test_de->get_record_count(), cnt + 1000); } else { - sleep(1); + _mm_pause(); } - } while (cnt < 10000); + } while (cnt < 100000); test_de->await_next_epoch(); - ck_assert_int_eq(test_de->get_record_count(), 11000); + ck_assert_int_eq(test_de->get_record_count(), 101000); delete test_de; } @@ -131,12 +133,12 @@ END_TEST START_TEST(t_range_query) { - auto test_de = new DE(100, 1000, 2); - size_t n = 10000; + auto test_de = new DE(1000, 10000, 4); + size_t n = 10000000; std::vector keys; for (size_t i=0; iinsert(r)) { i++; } else { - sleep(1); + _mm_pause(); } } + test_de->await_next_epoch(); std::sort(keys.begin(), keys.end()); @@ -166,9 +169,12 @@ START_TEST(t_range_query) p.lower_bound = lower_key; p.upper_bound = upper_key; + fprintf(stderr, "query start\n"); auto result = test_de->query(&p); auto r = result.get(); + fprintf(stderr, "query stop\n"); std::sort(r.begin(), r.end()); + ck_assert_int_eq(r.size(), 251); for (size_t i=0; iinsert(r)) { - sleep(1); + _mm_pause(); } if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) { @@ -215,7 +221,7 @@ START_TEST(t_tombstone_merging_01) for (size_t i=0; ierase(dr)) { - sleep(1); + _mm_pause(); } deletes++; to_delete.erase(del_vec[i]); @@ -307,7 +313,7 @@ START_TEST(t_static_structure) for (auto rec : records) { k++; while (!test_de->insert(rec)) { - sleep(1); + _mm_pause(); } t_reccnt++; @@ -316,8 +322,8 @@ START_TEST(t_static_structure) std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()}); for (size_t i=0; ierase(del_vec[1])) { - sleep(1); + while (!test_de->erase(del_vec[i])) { + _mm_pause(); } deletes++; @@ -331,12 +337,23 @@ START_TEST(t_static_structure) } } - auto flat = test_de->create_static_structure(); - ck_assert_int_eq(flat->get_record_count(), reccnt - deletes); + + //fprintf(stderr, "Tombstones: %ld\tRecords: %ld\n", test_de->get_tombstone_count(), test_de->get_record_count()); + //fprintf(stderr, "Inserts: %ld\tDeletes:%ld\tNet:%ld\n", reccnt, deletes, reccnt - deletes); + + auto flat = test_de->create_static_structure(true); + //fprintf(stderr, "Flat: Tombstones: %ld\tRecords %ld\n", flat->get_tombstone_count(), flat->get_record_count()); + //ck_assert_int_eq(flat->get_record_count(), reccnt - deletes); uint64_t prev_key = 0; for (size_t i=0; iget_record_count(); i++) { auto k = flat->get_record_at(i)->rec.key; + if (flat->get_record_at(i)->is_tombstone()) { + fprintf(stderr, "%ld %ld %ld\n", flat->get_record_at(i-1)->rec.key, + flat->get_record_at(i)->rec.key, + flat->get_record_at(i+1)->rec.key); + } + // ck_assert(!flat->get_record_at(i)->is_tombstone()); ck_assert_int_ge(k, prev_key); prev_key = k; } @@ -360,9 +377,9 @@ static void inject_dynamic_extension_tests(Suite *suite) { tcase_set_timeout(insert, 500); suite_add_tcase(suite, insert); - /* TCase *query = tcase_create("de::DynamicExtension::range_query Testing"); tcase_add_test(query, t_range_query); + tcase_set_timeout(query, 500); suite_add_tcase(suite, query); @@ -375,5 +392,4 @@ static void inject_dynamic_extension_tests(Suite *suite) { tcase_add_test(flat, t_static_structure); tcase_set_timeout(flat, 500); suite_add_tcase(suite, flat); - */ } diff --git a/tests/include/rangequery.h b/tests/include/rangequery.h index 3c7e7e0..e45de57 100644 --- a/tests/include/rangequery.h +++ b/tests/include/rangequery.h @@ -24,12 +24,12 @@ * should be included in the source file that includes this one, above the * include statement. */ -//#include "shard/ISAMTree.h" -//#include "query/rangequery.h" -//#include "testing.h" -//#include -//using namespace de; -//typedef ISAMTree Shard; +#include "shard/ISAMTree.h" +#include "query/rangequery.h" +#include "testing.h" +#include +using namespace de; +typedef ISAMTree Shard; START_TEST(t_range_query) @@ -137,15 +137,12 @@ START_TEST(t_lower_bound) auto buffer1 = create_sequential_mbuffer(100, 200); auto buffer2 = create_sequential_mbuffer(400, 1000); - Shard *shards[2]; + auto shard1 = new Shard(buffer1->get_buffer_view()); + auto shard2 = new Shard(buffer2->get_buffer_view()); - auto shard1 = Shard(buffer1->get_buffer_view()); - auto shard2 = Shard(buffer2->get_buffer_view()); - - shards[0] = &shard1; - shards[1] = &shard2; + std::vector shards = {shard1, shard2}; - auto merged = Shard(shards, 2); + auto merged = Shard(shards); for (size_t i=100; i<1000; i++) { Rec r; @@ -167,6 +164,8 @@ START_TEST(t_lower_bound) delete buffer1; delete buffer2; + delete shard1; + delete shard2; } END_TEST diff --git a/tests/include/shard_standard.h b/tests/include/shard_standard.h index 047a7b5..ddd7614 100644 --- a/tests/include/shard_standard.h +++ b/tests/include/shard_standard.h @@ -65,8 +65,8 @@ START_TEST(t_shard_init) auto shard2 = new Shard(mbuffer2->get_buffer_view()); auto shard3 = new Shard(mbuffer3->get_buffer_view()); - Shard* shards[3] = {shard1, shard2, shard3}; - auto shard4 = new Shard(shards, 3); + std::vector shards = {shard1, shard2, shard3}; + auto shard4 = new Shard(shards); ck_assert_int_eq(shard4->get_record_count(), n * 3); ck_assert_int_eq(shard4->get_tombstone_count(), 0); @@ -119,9 +119,9 @@ START_TEST(t_full_cancelation) ck_assert_int_eq(shard_ts->get_record_count(), n); ck_assert_int_eq(shard_ts->get_tombstone_count(), n); - Shard* shards[] = {shard, shard_ts}; + std::vector shards = {shard, shard_ts}; - Shard* merged = new Shard(shards, 2); + Shard* merged = new Shard(shards); ck_assert_int_eq(merged->get_tombstone_count(), 0); ck_assert_int_eq(merged->get_record_count(), 0); diff --git a/tests/rangequery_tests.cpp b/tests/rangequery_tests.cpp index 6a00f5a..78a4e72 100644 --- a/tests/rangequery_tests.cpp +++ b/tests/rangequery_tests.cpp @@ -125,15 +125,12 @@ START_TEST(t_lower_bound) auto buffer1 = create_sequential_mbuffer(100, 200); auto buffer2 = create_sequential_mbuffer(400, 1000); - Shard *shards[2]; - auto shard1 = Shard(buffer1->get_buffer_view()); auto shard2 = Shard(buffer2->get_buffer_view()); - shards[0] = &shard1; - shards[1] = &shard2; + std::vector shards = {&shard1, &shard2}; - auto merged = Shard(shards, 2); + auto merged = Shard(shards); for (size_t i=100; i<1000; i++) { Rec r; -- cgit v1.2.3