From b485685968c7ab626d98cc2a84a122d7ca3c68ce Mon Sep 17 00:00:00 2001 From: "Douglas B. Rumbaugh" Date: Mon, 15 Jan 2024 15:16:20 -0500 Subject: Use 16-byte CAS to control buffer head --- CMakeLists.txt | 27 ++++--- include/framework/structure/MutableBuffer.h | 120 ++++++++++++++-------------- 2 files changed, 77 insertions(+), 70 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3b2a1ad..8ca7cb7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,8 +14,9 @@ set(old_bench False) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin") +set(CMAKE_CXX_FLAGS=-latomic -mcx16) -add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal) +add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal -mcx16) if (debug) add_compile_options(-g -O0) @@ -37,15 +38,18 @@ if (tests) #target_include_directories(augbtree_tests PRIVATE include external/psudb-common/cpp/include external/ctpl) add_executable(internal_level_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/internal_level_tests.cpp) - target_link_libraries(internal_level_tests PUBLIC gsl check subunit pthread) + target_link_libraries(internal_level_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(internal_level_tests PUBLIC -mcx16) target_include_directories(internal_level_tests PRIVATE include external/psudb-common/cpp/include) add_executable(mutable_buffer_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/mutable_buffer_tests.cpp) - target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread) + target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(mutable_buffer_tests PUBLIC -mcx16) target_include_directories(mutable_buffer_tests PRIVATE include external/psudb-common/cpp/include) add_executable(rangequery_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/rangequery_tests.cpp) - target_link_libraries(rangequery_tests PUBLIC gsl check subunit pthread) + target_link_libraries(rangequery_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(rangequery_tests PUBLIC -mcx16) target_include_directories(rangequery_tests PRIVATE include external/psudb-common/cpp/include) #add_executable(vptree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/vptree_tests.cpp) @@ -53,23 +57,28 @@ if (tests) #target_include_directories(vptree_tests PRIVATE include external/vptree external/psudb-common/cpp/include) add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp) - target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread) + target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread atomic) + target_link_options(de_tier_tag PUBLIC -mcx16) target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external) add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp) - target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread) + target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread atomic) + target_link_options(de_tier_tomb PUBLIC -mcx16) target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) - target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread) + target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic) + target_link_options(de_level_tag PUBLIC -mcx16) target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp) - target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread) + target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic) + target_link_options(de_level_tomb PUBLIC -mcx16) target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) - target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread) + target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread atomic) + target_link_options(memisam_tests PUBLIC -mcx16) target_include_directories(memisam_tests PRIVATE include external/psudb-common/cpp/include) #add_executable(alias_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/alias_tests.cpp) diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 5b655fc..eeb3dc9 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -36,16 +36,20 @@ namespace de { template class MutableBuffer { friend class BufferView; + + struct buffer_head { + size_t head_idx; + size_t refcnt; + }; + public: MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0) : m_lwm(low_watermark) , m_hwm(high_watermark) , m_cap((capacity == 0) ? 2 * high_watermark : capacity) , m_tail(0) - , m_head(0) - , m_head_refcnt(0) - , m_old_head(high_watermark) - , m_old_head_refcnt(0) + , m_head({0, 0}) + , m_old_head({high_watermark, 0}) , m_data((Wrapped *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped))) , m_tombstone_filter(new psudb::BloomFilter(BF_FPR, m_hwm, BF_HASH_FUNCS)) , m_tscnt(0) @@ -94,7 +98,7 @@ public: } size_t get_record_count() { - return m_tail - m_head; + return m_tail.load() - m_head.load().head_idx; } size_t get_capacity() { @@ -130,16 +134,17 @@ public: } BufferView get_buffer_view() { - m_head_refcnt.fetch_add(1); - auto f = std::bind(release_head_reference, (void *) this, m_head.load()); - return BufferView(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); + size_t head = get_head(); + auto f = std::bind(release_head_reference, (void *) this, head); + + return BufferView(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f); } BufferView get_flush_buffer_view() { - m_head_refcnt.fetch_add(1); - auto f = std::bind(release_head_reference, (void *) this, m_head.load()); - return BufferView(m_data, m_cap, m_head.load(), m_head.load() + m_lwm, m_tscnt.load(), m_tombstone_filter, f); + size_t head = get_head(); + auto f = std::bind(release_head_reference, (void *) this, head); + return BufferView(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f); } /* @@ -148,38 +153,41 @@ public: * assign new_head to old_head. */ bool advance_head(size_t new_head) { - assert(new_head > m_head.load()); + assert(new_head > m_head.load().head_idx); assert(new_head <= m_tail.load()); /* refuse to advance head while there is an old with one references */ - if (m_old_head_refcnt > 0) { + if (m_old_head.load().refcnt > 0) { fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts"); return false; } m_active_head_advance.store(true); - /* - * the order here is very important. We first store zero to the - * old_refcnt (should be zero anyway). Then we move the current - * head to old head. At this point, any new buffer views should - * increment the old head refcnt, so no new references to the - * current head will be taken. Then we add the current head - * refcnt to this. This is to ensure that no references get - * dropped. Only after this do we change to the new head - */ - m_old_head_refcnt.store(0); - - m_old_head.store(m_head.load()); - m_old_head_refcnt.fetch_add(m_head_refcnt); + buffer_head new_hd = {new_head, 0}; + buffer_head cur_hd; - m_head_refcnt.store(0); - m_head.store(new_head); + /* move the current head into the old head */ + do { + buffer_head cur_hd = m_head.load(); + m_old_head.store(cur_hd); + } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); m_active_head_advance.store(false); return true; } + size_t get_head() { + buffer_head cur_hd, new_hd; + + do { + cur_hd = m_head.load(); + new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1}; + } while(!m_head.compare_exchange_strong(cur_hd, new_hd)); + + return new_hd.head_idx; + } + void set_low_watermark(size_t lwm) { assert(lwm < m_hwm); m_lwm = lwm; @@ -213,7 +221,7 @@ public: * space. */ size_t get_available_capacity() { - return m_cap - (m_tail.load() - m_old_head.load()); + return m_cap - (m_tail.load() - m_old_head.load().head_idx); } private: @@ -221,7 +229,7 @@ private: size_t old_value = m_tail.load(); /* if full, fail to advance the tail */ - if (old_value - m_head.load() >= m_hwm) { + if (old_value - m_head.load().head_idx >= m_hwm) { return -1; } @@ -244,33 +252,26 @@ private: static void release_head_reference(void *buff, size_t head) { MutableBuffer *buffer = (MutableBuffer *) buff; - /* - * check old head first. During a head transition, the head being - * retired will first be assigned to *both* head and old_head. As - * a result, any refcnt updates during this time should be applied - * to old_head, even if the current head and the head being released - * also match. - */ - if (head == buffer->m_old_head.load()) { - assert(buffer->m_old_head_refcnt > 0); - buffer->m_old_head_refcnt.fetch_sub(1); - /* - * if the old head refcnt drops to 0, free - * the records by setting old_head = head - * before this, spin while the two heads are equal to - * avoid - */ - while (buffer->m_active_head_advance.load()) { - _mm_pause(); - } - - if (buffer->m_old_head_refcnt.load() == 0) { - buffer->m_old_head.store(buffer->m_head); + buffer_head cur_hd, new_hd; + do { + if (buffer->m_head.load().head_idx == head) { + cur_hd = buffer->m_head; + assert(cur_hd.refcnt > 0); + new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; + + if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) { + break; + } + } else { + cur_hd = buffer->m_old_head; + assert(cur_hd.refcnt > 0); + new_hd = {cur_hd.head_idx, cur_hd.refcnt-1}; + if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) { + break; + } } - } else if (head == buffer->m_head.load()) { - assert(buffer->m_head_refcnt > 0); - buffer->m_head_refcnt.fetch_sub(1); - } + _mm_pause(); + } while(true); } size_t m_lwm; @@ -279,11 +280,8 @@ private: alignas(64) std::atomic m_tail; - alignas(64) std::atomic m_head; - alignas(64) std::atomic m_head_refcnt; - - alignas(64) std::atomic m_old_head; - alignas(64) std::atomic m_old_head_refcnt; + alignas(64) std::atomic m_head; + alignas(64) std::atomic m_old_head; Wrapped* m_data; psudb::BloomFilter* m_tombstone_filter; -- cgit v1.2.3