diff options
| -rw-r--r-- | .gitmodules | 3 | ||||
| -rw-r--r-- | CMakeLists.txt | 10 | ||||
| m--------- | external/ctpl | 0 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 77 | ||||
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 45 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 13 | ||||
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 2 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 81 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 17 | ||||
| -rw-r--r-- | tests/dynamic_extension_tests.inc | 6 |
11 files changed, 170 insertions, 86 deletions
diff --git a/.gitmodules b/.gitmodules index 7616a12..0b195bd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -16,3 +16,6 @@ [submodule "external/psudb-common"] path = external/psudb-common url = git@github.com:PSU-Database-Systems-Group/psudb-common +[submodule "external/ctpl"] + path = external/ctpl + url = git@github.com:vit-vit/CTPL.git diff --git a/CMakeLists.txt b/CMakeLists.txt index c4f28e0..d0e14c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,7 +12,7 @@ set(bench false) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin") -add_compile_options(-Iinclude -Iexternal/PLEX/include) +add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal) if (debug) add_compile_options(-g -O0) @@ -47,19 +47,19 @@ if (tests) add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp) target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread) - target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include) + target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external) add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp) target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread) - target_include_directories(de_tier_tomb PRIVATE include external/psudb-common/cpp/include) + target_include_directories(de_tier_tomb PRIVATE include external/psudb-common/cpp/include external) add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread) - target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include) + target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp) target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread) - target_include_directories(de_level_tomb PRIVATE include external/psudb-common/cpp/include) + target_include_directories(de_level_tomb PRIVATE include external/ctpl external/psudb-common/cpp/include external) add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread) diff --git a/external/ctpl b/external/ctpl new file mode 160000 +Subproject 437e135dbd94eb65b45533d9ce8ee28b5bd37b6 diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 0858fc3..233bebb 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -198,8 +198,8 @@ public: */ void await_next_epoch() { while (m_current_epoch.load() != m_newest_epoch.load()) { - std::unique_lock<std::mutex> m_epoch_cv_lk; - m_epoch_cv.wait(m_epoch_cv_lk); + std::unique_lock<std::mutex> lk(m_epoch_cv_lk); + m_epoch_cv.wait(lk); } return; @@ -238,14 +238,26 @@ private: } _Epoch *get_active_epoch_protected() { - m_epochs[m_current_epoch.load()]->start_job(); - return m_epochs[m_current_epoch.load()]; + ssize_t cur_epoch = -1; + do { + if (cur_epoch != -1) { + m_epochs[cur_epoch]->end_job(); + } + + cur_epoch = m_current_epoch.load(); + m_epochs[cur_epoch]->start_job(); + } while (cur_epoch != m_current_epoch.load()); + + return m_epochs[cur_epoch]; } void advance_epoch() { size_t new_epoch_num = m_newest_epoch.load(); + size_t old_epoch_num = m_current_epoch.load(); + assert(new_epoch_num != old_epoch_num); + _Epoch *new_epoch = m_epochs[new_epoch_num]; - _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + _Epoch *old_epoch = m_epochs[old_epoch_num]; /* * Update the new Epoch to contain the buffers from the old one @@ -255,7 +267,11 @@ private: */ if constexpr (!std::same_as<SCHED, SerialScheduler>) { size_t old_buffer_cnt = new_epoch->clear_buffers(); - for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) { + // FIXME: this is getting nightmarish... The -1 here is to ensure that the + // the empty buffer added when the merge was first triggered is also included. + // Due to the reordering of operations in internal_append, the new buffer exists + // at the time of the clone, and so is already in the new epoch. + for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) { new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } @@ -298,14 +314,22 @@ private: * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(_Epoch *epoch) { - auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); + Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) { + auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock<std::mutex> m_struct_lock; + auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer); + /* + * if epoch->add_buffer doesn't add the new buffer, this insert + * won't update the buffer set (duplicate insert) + */ m_buffers.insert(new_buffer); m_struct_lock.release(); - epoch->add_buffer(new_buffer); + if (new_buffer != temp_buffer) { + delete temp_buffer; + } + return new_buffer; } @@ -448,24 +472,41 @@ private: int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; + int res = 0; do { - // FIXME: figure out best way to protect this epoch access - auto epoch = get_active_epoch(); + auto epoch = get_active_epoch_protected(); buffer = epoch->get_active_buffer(); + assert(buffer); - /* if the buffer is full, schedule a merge and add a new empty buffer */ + /* + * If the buffer is full and there is no current merge, + * schedule a merge and add a new empty buffer. If there + * is a current merge, then just add a new empty buffer + * to the current epoch. + */ if (buffer->is_full()) { - // FIXME: possible race here--two identical merges could be scheduled - auto vers = epoch->get_structure(); - schedule_merge(); - if constexpr (std::same_as<SCHED, SerialScheduler>) { + /* single threaded: run merge and then empty buffer */ + epoch->end_job(); + schedule_merge(); buffer->truncate(); - } else { + continue; + } else if (epoch->prepare_merge()) { + /* + * add an empty buffer to allow insert proceed and + * schedule a merge on a background thread + */ buffer = add_empty_buffer(epoch); + schedule_merge(); + } else { + /* background merge is ongoing, so just add empty buffer */ + buffer = add_empty_buffer(epoch, buffer); } } - } while(!buffer->append(rec, ts)); + + res = buffer->append(rec, ts); + epoch->end_job(); + } while(!res); /* internal append should always succeed, eventually */ return 1; diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index f4aefe9..0ebbde9 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -25,6 +25,7 @@ public: Epoch(size_t number=0) : m_buffers() , m_structure(nullptr) + , m_active_merge(false) , m_active_jobs(0) , m_active(true) , m_epoch_number(number) @@ -34,6 +35,7 @@ public: : m_buffers() , m_structure(structure) , m_active_jobs(0) + , m_active_merge(false) , m_active(true) , m_epoch_number(number) { @@ -54,11 +56,25 @@ public: } } - void add_buffer(Buffer *buf) { + Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { assert(buf); + /* + * if a current buffer is specified, only add the + * new buffer if the active buffer is the current, + * otherwise just return the active buffer (poor man's + * CAS). + */ + if (cur_buf) { + auto active_buf = get_active_buffer(); + if (active_buf != cur_buf) { + return active_buf; + } + } + buf->take_reference(); m_buffers.push_back(buf); + return buf; } void start_job() { @@ -137,6 +153,31 @@ public: return epoch; } + /* + * Check if a merge can be started from this Epoch. + * At present, without concurrent merging, this simply + * checks if there is currently a scheduled merge based + * on this Epoch. If there is, returns false. If there + * isn't, return true and set a flag indicating that + * there is an active merge. + */ + bool prepare_merge() { + auto old = m_active_merge.load(); + if (old) { + return false; + } + + // FIXME: this needs cleaned up + while (!m_active_merge.compare_exchange_strong(old, true)) { + old = m_active_merge.load(); + if (old) { + return false; + } + } + + return true; + } + void set_inactive() { m_active = false; } @@ -170,6 +211,8 @@ private: std::condition_variable m_active_cv; std::mutex m_cv_lock; + std::atomic<bool> m_active_merge; + /* * The number of currently active jobs * (queries/merges) operating on this diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 91a72b3..1521eb6 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -24,20 +24,26 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" namespace de { + class FIFOScheduler { +private: + static const size_t DEFAULT_MAX_THREADS = 8; + public: FIFOScheduler(size_t memory_budget, size_t thread_cnt) : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS) , m_used_memory(0) , m_used_thrds(0) , m_shutdown(false) { m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_thrd_pool.resize(m_thrd_cnt); } ~FIFOScheduler() { @@ -72,6 +78,7 @@ private: std::condition_variable m_cv; std::thread m_sched_thrd; + ctpl::thread_pool m_thrd_pool; std::atomic<size_t> m_used_thrds; std::atomic<size_t> m_used_memory; @@ -79,7 +86,7 @@ private: void schedule_next() { assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); - t(); + m_thrd_pool.push(t); } void run() { @@ -87,7 +94,7 @@ private: std::unique_lock<std::mutex> cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { schedule_next(); } } while(!m_shutdown.load()); diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 9c767e8..93611d1 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -47,7 +47,7 @@ public: void schedule_job(std::function<void(void*)> job, size_t size, void *args) { size_t ts = m_counter++; auto t = Task(size, ts, job, args); - t(); + t(0); } void shutdown() { diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 228665f..6dfd7df 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -54,7 +54,7 @@ struct Task { return self.m_timestamp > other.m_timestamp; } - void operator()() { + void operator()(size_t thrd_id) { m_job(m_args); } }; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 7a7b98c..632fe17 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -33,32 +33,10 @@ public: : m_level_no(level_no) , m_shard_cnt(0) , m_shards(shard_cap, nullptr) - , m_owns(shard_cap, true) , m_pending_shard(nullptr) {} - // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 - // WARNING: for leveling only. - InternalLevel(InternalLevel* level) - : m_level_no(level->m_level_no + 1) - , m_shard_cnt(level->m_shard_cnt) - , m_shards(level->m_shards.size(), nullptr) - , m_owns(level->m_owns.size(), true) - , m_pending_shard(nullptr) - { - assert(m_shard_cnt == 1 && m_shards.size() == 1); - - for (size_t i=0; i<m_shards.size(); i++) { - level->m_owns[i] = false; - m_shards[i] = level->m_shards[i]; - } - } - ~InternalLevel() { - for (size_t i=0; i<m_shards.size(); i++) { - if (m_owns[i]) delete m_shards[i]; - } - delete m_pending_shard; } @@ -69,10 +47,10 @@ public: auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; Shard* shards[2]; - shards[0] = base_level->m_shards[0]; - shards[1] = new_level->m_shards[0]; + shards[0] = base_level->m_shards[0].get(); + shards[1] = new_level->m_shards[0].get(); - res->m_shards[0] = new S(shards, 2); + res->m_shards[0] = std::make_shared<S>(shards, 2); return std::shared_ptr<InternalLevel>(res); } @@ -83,19 +61,23 @@ public: return; } - m_shards[m_shard_cnt] = new S(buffer); - m_owns[m_shard_cnt] = true; + m_shards[m_shard_cnt] = std::make_shared<S>(buffer); ++m_shard_cnt; } void append_merged_shards(InternalLevel* level) { + Shard *shards[level->m_shard_cnt]; + for (size_t i=0; i<level->m_shard_cnt; i++) { + shards[i] = level->m_shards[i].get(); + } + if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new S(level->m_shards.data(), level->m_shard_cnt); + m_pending_shard = new S(shards, level->m_shard_cnt); return; } - m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt); - m_owns[m_shard_cnt] = true; + auto tmp = new S(shards, level->m_shard_cnt); + m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp); ++m_shard_cnt; } @@ -104,15 +86,10 @@ public: void finalize() { if (m_pending_shard) { for (size_t i=0; i<m_shards.size(); i++) { - if (m_owns[i]) { - delete m_shards[i]; - m_shards[i] = nullptr; - m_owns[i] = false; - } + m_shards[i] = nullptr; } - m_shards[0] = m_pending_shard; - m_owns[0] = true; + m_shards[0] = std::shared_ptr<S>(m_pending_shard); m_pending_shard = nullptr; m_shard_cnt = 1; } @@ -126,7 +103,7 @@ public: Shard *shards[m_shard_cnt]; for (size_t i=0; i<m_shard_cnt; i++) { - shards[i] = m_shards[i]; + shards[i] = m_shards[i].get(); } return new S(shards, m_shard_cnt); @@ -136,8 +113,8 @@ public: void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { for (size_t i=0; i<m_shard_cnt; i++) { if (m_shards[i]) { - auto shard_state = Q::get_query_state(m_shards[i], query_parms); - shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]}); + auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms); + shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()}); shard_states.emplace_back(shard_state); } } @@ -174,7 +151,7 @@ public: } Shard* get_shard(size_t idx) { - return m_shards[idx]; + return m_shards[idx].get(); } size_t get_shard_count() { @@ -184,7 +161,9 @@ public: size_t get_record_count() { size_t cnt = 0; for (size_t i=0; i<m_shard_cnt; i++) { - cnt += m_shards[i]->get_record_count(); + if (m_shards[i]) { + cnt += m_shards[i]->get_record_count(); + } } return cnt; @@ -193,7 +172,9 @@ public: size_t get_tombstone_count() { size_t res = 0; for (size_t i = 0; i < m_shard_cnt; ++i) { - res += m_shards[i]->get_tombstone_count(); + if (m_shards[i]) { + res += m_shards[i]->get_tombstone_count(); + } } return res; } @@ -201,7 +182,9 @@ public: size_t get_aux_memory_usage() { size_t cnt = 0; for (size_t i=0; i<m_shard_cnt; i++) { - cnt += m_shards[i]->get_aux_memory_usage(); + if (m_shards[i]){ + cnt += m_shards[i]->get_aux_memory_usage(); + } } return cnt; @@ -224,7 +207,7 @@ public: for (size_t i=0; i<m_shard_cnt; i++) { if (m_shards[i]) { tscnt += m_shards[i]->get_tombstone_count(); - reccnt += (*m_shards[i])->get_record_count(); + reccnt += m_shards[i]->get_record_count(); } } @@ -235,8 +218,6 @@ public: auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); for (size_t i=0; i<m_shard_cnt; i++) { new_level->m_shards[i] = m_shards[i]; - new_level->m_owns[i] = true; - m_owns[i] = false; } return new_level; @@ -248,12 +229,8 @@ private: size_t m_shard_cnt; size_t m_shard_size_cap; - std::vector<Shard*> m_shards; - + std::vector<std::shared_ptr<Shard>> m_shards; Shard *m_pending_shard; - - std::vector<bool> m_owns; - }; } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index a70b86b..ba25cc3 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -33,7 +33,7 @@ class MutableBuffer { public: MutableBuffer(size_t capacity, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) - , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { + , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) { m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>)); m_tombstone_filter = nullptr; @@ -83,6 +83,7 @@ public: m_weight.fetch_add(1); } + m_reccnt.fetch_add(1); return 1; } @@ -91,6 +92,7 @@ public: m_reccnt.store(0); m_weight.store(0); m_max_weight.store(0); + m_tail.store(0); if (m_tombstone_filter) m_tombstone_filter->clear(); return true; @@ -193,11 +195,15 @@ public: } private: - int32_t try_advance_tail() { - size_t new_tail = m_reccnt.fetch_add(1); + int64_t try_advance_tail() { + int64_t new_tail = m_tail.fetch_add(1); - if (new_tail < m_cap) return new_tail; - else return -1; + if (new_tail < m_cap) { + return new_tail; + } + + m_tail.fetch_add(-1); + return -1; } size_t m_cap; @@ -210,6 +216,7 @@ private: alignas(64) std::atomic<size_t> m_tombstonecnt; alignas(64) std::atomic<uint32_t> m_reccnt; + alignas(64) std::atomic<int64_t> m_tail; alignas(64) std::atomic<double> m_weight; alignas(64) std::atomic<double> m_max_weight; diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc index aa12e31..bcb5ae6 100644 --- a/tests/dynamic_extension_tests.inc +++ b/tests/dynamic_extension_tests.inc @@ -76,6 +76,8 @@ START_TEST(t_insert_with_mem_merges) val++; } + test_de->await_next_epoch(); + ck_assert_int_eq(test_de->get_record_count(), 300); ck_assert_int_eq(test_de->get_height(), 1); @@ -174,6 +176,8 @@ START_TEST(t_range_query) ck_assert_int_eq(test_de->insert(r), 1); } + test_de->await_next_epoch(); + std::sort(keys.begin(), keys.end()); auto idx = rand() % (keys.size() - 250); @@ -245,6 +249,8 @@ START_TEST(t_tombstone_merging_01) ck_assert(test_de->validate_tombstone_proportion()); } + test_de->await_next_epoch(); + ck_assert(test_de->validate_tombstone_proportion()); gsl_rng_free(rng); |