diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-11-06 10:01:23 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-11-06 10:20:03 -0500 |
| commit | 4e4cf858122ca6c1ae6d5f635e839089769fee38 (patch) | |
| tree | af2baf54991e8aca6766671c5701fbfd6dffed7d | |
| parent | 83ca486048a5053d8c75bb5041091edb1b183a85 (diff) | |
| download | dynamic-extension-4e4cf858122ca6c1ae6d5f635e839089769fee38.tar.gz | |
Scheduling: Switched over to a thread pool model
| -rw-r--r-- | .gitmodules | 3 | ||||
| -rw-r--r-- | CMakeLists.txt | 10 | ||||
| m--------- | external/ctpl | 0 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 13 | ||||
| -rw-r--r-- | include/framework/scheduling/SerialScheduler.h | 2 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 2 | ||||
| -rw-r--r-- | tests/de_level_tomb.cpp | 2 | ||||
| -rw-r--r-- | tests/dynamic_extension_tests.inc | 7 |
8 files changed, 28 insertions, 11 deletions
diff --git a/.gitmodules b/.gitmodules index 7616a12..0b195bd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -16,3 +16,6 @@ [submodule "external/psudb-common"] path = external/psudb-common url = git@github.com:PSU-Database-Systems-Group/psudb-common +[submodule "external/ctpl"] + path = external/ctpl + url = git@github.com:vit-vit/CTPL.git diff --git a/CMakeLists.txt b/CMakeLists.txt index c8188a9..2c16006 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,7 +12,7 @@ set(bench false) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin") -add_compile_options(-Iinclude -Iexternal/PLEX/include) +add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal) if (debug) add_compile_options(-g -O0) @@ -51,19 +51,19 @@ if (tests) add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp) target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread) - target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include) + target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external) add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp) target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread) - target_include_directories(de_tier_tomb PRIVATE include external/psudb-common/cpp/include) + target_include_directories(de_tier_tomb PRIVATE include external/psudb-common/cpp/include external) add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread) - target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include) + target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp) target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread) - target_include_directories(de_level_tomb PRIVATE include external/psudb-common/cpp/include) + target_include_directories(de_level_tomb PRIVATE include external/ctpl external/psudb-common/cpp/include external) add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread) diff --git a/external/ctpl b/external/ctpl new file mode 160000 +Subproject 437e135dbd94eb65b45533d9ce8ee28b5bd37b6 diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 91a72b3..1521eb6 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -24,20 +24,26 @@ #include "framework/structure/ExtensionStructure.h" #include "framework/scheduling/Task.h" +#include "ctpl/ctpl.h" #include "psu-ds/LockedPriorityQueue.h" namespace de { + class FIFOScheduler { +private: + static const size_t DEFAULT_MAX_THREADS = 8; + public: FIFOScheduler(size_t memory_budget, size_t thread_cnt) : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX) - , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX) + , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS) , m_used_memory(0) , m_used_thrds(0) , m_shutdown(false) { m_sched_thrd = std::thread(&FIFOScheduler::run, this); + m_thrd_pool.resize(m_thrd_cnt); } ~FIFOScheduler() { @@ -72,6 +78,7 @@ private: std::condition_variable m_cv; std::thread m_sched_thrd; + ctpl::thread_pool m_thrd_pool; std::atomic<size_t> m_used_thrds; std::atomic<size_t> m_used_memory; @@ -79,7 +86,7 @@ private: void schedule_next() { assert(m_task_queue.size() > 0); auto t = m_task_queue.pop(); - t(); + m_thrd_pool.push(t); } void run() { @@ -87,7 +94,7 @@ private: std::unique_lock<std::mutex> cv_lock(m_cv_lock); m_cv.wait(cv_lock); - while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) { + while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) { schedule_next(); } } while(!m_shutdown.load()); diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index 9c767e8..93611d1 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -47,7 +47,7 @@ public: void schedule_job(std::function<void(void*)> job, size_t size, void *args) { size_t ts = m_counter++; auto t = Task(size, ts, job, args); - t(); + t(0); } void shutdown() { diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 228665f..6dfd7df 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -54,7 +54,7 @@ struct Task { return self.m_timestamp > other.m_timestamp; } - void operator()() { + void operator()(size_t thrd_id) { m_job(m_args); } }; diff --git a/tests/de_level_tomb.cpp b/tests/de_level_tomb.cpp index 264b5b0..ba3f784 100644 --- a/tests/de_level_tomb.cpp +++ b/tests/de_level_tomb.cpp @@ -20,6 +20,6 @@ #include <check.h> using namespace de; -typedef DynamicExtension<WRec, WIRS<WRec>, WIRSQuery<WRec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE; +typedef DynamicExtension<WRec, WIRS<WRec>, WIRSQuery<WRec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE; #include "dynamic_extension_tests.inc" diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc index 2f82e92..df88d98 100644 --- a/tests/dynamic_extension_tests.inc +++ b/tests/dynamic_extension_tests.inc @@ -77,6 +77,8 @@ START_TEST(t_insert_with_mem_merges) val++; } + ext_wirs->await_next_epoch(); + ck_assert_int_eq(ext_wirs->get_record_count(), 300); ck_assert_int_eq(ext_wirs->get_height(), 1); @@ -197,6 +199,9 @@ START_TEST(t_range_sample_weighted) WRec r = {keys[i], (uint32_t) i, weight}; ext_wirs->insert(r); } + + ext_wirs->await_next_epoch(); + size_t k = 1000; uint64_t lower_key = 0; uint64_t upper_key = 5; @@ -277,6 +282,8 @@ START_TEST(t_tombstone_merging_01) ck_assert(ext_wirs->validate_tombstone_proportion()); } + ext_wirs->await_next_epoch(); + ck_assert(ext_wirs->validate_tombstone_proportion()); gsl_rng_free(rng); |