summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-15 15:16:20 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-01-15 15:16:20 -0500
commitb485685968c7ab626d98cc2a84a122d7ca3c68ce (patch)
tree7c3343b70ebd592f9506e06272babac37fd1a1f4
parentcf178ae74a76b780b655a447531d2114f9f81d98 (diff)
downloaddynamic-extension-b485685968c7ab626d98cc2a84a122d7ca3c68ce.tar.gz
Use 16-byte CAS to control buffer head
-rw-r--r--CMakeLists.txt27
-rw-r--r--include/framework/structure/MutableBuffer.h120
2 files changed, 77 insertions, 70 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3b2a1ad..8ca7cb7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -14,8 +14,9 @@ set(old_bench False)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin")
+set(CMAKE_CXX_FLAGS=-latomic -mcx16)
-add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal)
+add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal -mcx16)
if (debug)
add_compile_options(-g -O0)
@@ -37,15 +38,18 @@ if (tests)
#target_include_directories(augbtree_tests PRIVATE include external/psudb-common/cpp/include external/ctpl)
add_executable(internal_level_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/internal_level_tests.cpp)
- target_link_libraries(internal_level_tests PUBLIC gsl check subunit pthread)
+ target_link_libraries(internal_level_tests PUBLIC gsl check subunit pthread atomic)
+ target_link_options(internal_level_tests PUBLIC -mcx16)
target_include_directories(internal_level_tests PRIVATE include external/psudb-common/cpp/include)
add_executable(mutable_buffer_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/mutable_buffer_tests.cpp)
- target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread)
+ target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread atomic)
+ target_link_options(mutable_buffer_tests PUBLIC -mcx16)
target_include_directories(mutable_buffer_tests PRIVATE include external/psudb-common/cpp/include)
add_executable(rangequery_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/rangequery_tests.cpp)
- target_link_libraries(rangequery_tests PUBLIC gsl check subunit pthread)
+ target_link_libraries(rangequery_tests PUBLIC gsl check subunit pthread atomic)
+ target_link_options(rangequery_tests PUBLIC -mcx16)
target_include_directories(rangequery_tests PRIVATE include external/psudb-common/cpp/include)
#add_executable(vptree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/vptree_tests.cpp)
@@ -53,23 +57,28 @@ if (tests)
#target_include_directories(vptree_tests PRIVATE include external/vptree external/psudb-common/cpp/include)
add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp)
- target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread)
+ target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread atomic)
+ target_link_options(de_tier_tag PUBLIC -mcx16)
target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external)
add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp)
- target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread)
+ target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread atomic)
+ target_link_options(de_tier_tomb PUBLIC -mcx16)
target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
- target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread)
+ target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic)
+ target_link_options(de_level_tag PUBLIC -mcx16)
target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp)
- target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread)
+ target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic)
+ target_link_options(de_level_tomb PUBLIC -mcx16)
target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp)
- target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread)
+ target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread atomic)
+ target_link_options(memisam_tests PUBLIC -mcx16)
target_include_directories(memisam_tests PRIVATE include external/psudb-common/cpp/include)
#add_executable(alias_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/alias_tests.cpp)
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 5b655fc..eeb3dc9 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -36,16 +36,20 @@ namespace de {
template <RecordInterface R>
class MutableBuffer {
friend class BufferView<R>;
+
+ struct buffer_head {
+ size_t head_idx;
+ size_t refcnt;
+ };
+
public:
MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
: m_lwm(low_watermark)
, m_hwm(high_watermark)
, m_cap((capacity == 0) ? 2 * high_watermark : capacity)
, m_tail(0)
- , m_head(0)
- , m_head_refcnt(0)
- , m_old_head(high_watermark)
- , m_old_head_refcnt(0)
+ , m_head({0, 0})
+ , m_old_head({high_watermark, 0})
, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
, m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
, m_tscnt(0)
@@ -94,7 +98,7 @@ public:
}
size_t get_record_count() {
- return m_tail - m_head;
+ return m_tail.load() - m_head.load().head_idx;
}
size_t get_capacity() {
@@ -130,16 +134,17 @@ public:
}
BufferView<R> get_buffer_view() {
- m_head_refcnt.fetch_add(1);
- auto f = std::bind(release_head_reference, (void *) this, m_head.load());
- return BufferView<R>(m_data, m_cap, m_head.load(), m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
+ size_t head = get_head();
+ auto f = std::bind(release_head_reference, (void *) this, head);
+
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
}
BufferView<R> get_flush_buffer_view() {
- m_head_refcnt.fetch_add(1);
- auto f = std::bind(release_head_reference, (void *) this, m_head.load());
- return BufferView<R>(m_data, m_cap, m_head.load(), m_head.load() + m_lwm, m_tscnt.load(), m_tombstone_filter, f);
+ size_t head = get_head();
+ auto f = std::bind(release_head_reference, (void *) this, head);
+ return BufferView<R>(m_data, m_cap, head, head + m_lwm, m_tscnt.load(), m_tombstone_filter, f);
}
/*
@@ -148,38 +153,41 @@ public:
* assign new_head to old_head.
*/
bool advance_head(size_t new_head) {
- assert(new_head > m_head.load());
+ assert(new_head > m_head.load().head_idx);
assert(new_head <= m_tail.load());
/* refuse to advance head while there is an old with one references */
- if (m_old_head_refcnt > 0) {
+ if (m_old_head.load().refcnt > 0) {
fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts");
return false;
}
m_active_head_advance.store(true);
- /*
- * the order here is very important. We first store zero to the
- * old_refcnt (should be zero anyway). Then we move the current
- * head to old head. At this point, any new buffer views should
- * increment the old head refcnt, so no new references to the
- * current head will be taken. Then we add the current head
- * refcnt to this. This is to ensure that no references get
- * dropped. Only after this do we change to the new head
- */
- m_old_head_refcnt.store(0);
-
- m_old_head.store(m_head.load());
- m_old_head_refcnt.fetch_add(m_head_refcnt);
+ buffer_head new_hd = {new_head, 0};
+ buffer_head cur_hd;
- m_head_refcnt.store(0);
- m_head.store(new_head);
+ /* move the current head into the old head */
+ do {
+ buffer_head cur_hd = m_head.load();
+ m_old_head.store(cur_hd);
+ } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
m_active_head_advance.store(false);
return true;
}
+ size_t get_head() {
+ buffer_head cur_hd, new_hd;
+
+ do {
+ cur_hd = m_head.load();
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+
+ return new_hd.head_idx;
+ }
+
void set_low_watermark(size_t lwm) {
assert(lwm < m_hwm);
m_lwm = lwm;
@@ -213,7 +221,7 @@ public:
* space.
*/
size_t get_available_capacity() {
- return m_cap - (m_tail.load() - m_old_head.load());
+ return m_cap - (m_tail.load() - m_old_head.load().head_idx);
}
private:
@@ -221,7 +229,7 @@ private:
size_t old_value = m_tail.load();
/* if full, fail to advance the tail */
- if (old_value - m_head.load() >= m_hwm) {
+ if (old_value - m_head.load().head_idx >= m_hwm) {
return -1;
}
@@ -244,33 +252,26 @@ private:
static void release_head_reference(void *buff, size_t head) {
MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
- /*
- * check old head first. During a head transition, the head being
- * retired will first be assigned to *both* head and old_head. As
- * a result, any refcnt updates during this time should be applied
- * to old_head, even if the current head and the head being released
- * also match.
- */
- if (head == buffer->m_old_head.load()) {
- assert(buffer->m_old_head_refcnt > 0);
- buffer->m_old_head_refcnt.fetch_sub(1);
- /*
- * if the old head refcnt drops to 0, free
- * the records by setting old_head = head
- * before this, spin while the two heads are equal to
- * avoid
- */
- while (buffer->m_active_head_advance.load()) {
- _mm_pause();
- }
-
- if (buffer->m_old_head_refcnt.load() == 0) {
- buffer->m_old_head.store(buffer->m_head);
+ buffer_head cur_hd, new_hd;
+ do {
+ if (buffer->m_head.load().head_idx == head) {
+ cur_hd = buffer->m_head;
+ assert(cur_hd.refcnt > 0);
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
+ }
+ } else {
+ cur_hd = buffer->m_old_head;
+ assert(cur_hd.refcnt > 0);
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
+ }
}
- } else if (head == buffer->m_head.load()) {
- assert(buffer->m_head_refcnt > 0);
- buffer->m_head_refcnt.fetch_sub(1);
- }
+ _mm_pause();
+ } while(true);
}
size_t m_lwm;
@@ -279,11 +280,8 @@ private:
alignas(64) std::atomic<size_t> m_tail;
- alignas(64) std::atomic<size_t> m_head;
- alignas(64) std::atomic<size_t> m_head_refcnt;
-
- alignas(64) std::atomic<size_t> m_old_head;
- alignas(64) std::atomic<size_t> m_old_head_refcnt;
+ alignas(64) std::atomic<buffer_head> m_head;
+ alignas(64) std::atomic<buffer_head> m_old_head;
Wrapped<R>* m_data;
psudb::BloomFilter<R>* m_tombstone_filter;