summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-11-06 10:01:23 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-11-06 10:20:03 -0500
commit4e4cf858122ca6c1ae6d5f635e839089769fee38 (patch)
treeaf2baf54991e8aca6766671c5701fbfd6dffed7d
parent83ca486048a5053d8c75bb5041091edb1b183a85 (diff)
downloaddynamic-extension-4e4cf858122ca6c1ae6d5f635e839089769fee38.tar.gz
Scheduling: Switched over to a thread pool model
-rw-r--r--.gitmodules3
-rw-r--r--CMakeLists.txt10
m---------external/ctpl0
-rw-r--r--include/framework/scheduling/FIFOScheduler.h13
-rw-r--r--include/framework/scheduling/SerialScheduler.h2
-rw-r--r--include/framework/scheduling/Task.h2
-rw-r--r--tests/de_level_tomb.cpp2
-rw-r--r--tests/dynamic_extension_tests.inc7
8 files changed, 28 insertions, 11 deletions
diff --git a/.gitmodules b/.gitmodules
index 7616a12..0b195bd 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -16,3 +16,6 @@
[submodule "external/psudb-common"]
path = external/psudb-common
url = git@github.com:PSU-Database-Systems-Group/psudb-common
+[submodule "external/ctpl"]
+ path = external/ctpl
+ url = git@github.com:vit-vit/CTPL.git
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c8188a9..2c16006 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12,7 +12,7 @@ set(bench false)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin")
-add_compile_options(-Iinclude -Iexternal/PLEX/include)
+add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal)
if (debug)
add_compile_options(-g -O0)
@@ -51,19 +51,19 @@ if (tests)
add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp)
target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread)
- target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include)
+ target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external)
add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp)
target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread)
- target_include_directories(de_tier_tomb PRIVATE include external/psudb-common/cpp/include)
+ target_include_directories(de_tier_tomb PRIVATE include external/psudb-common/cpp/include external)
add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread)
- target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include)
+ target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp)
target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread)
- target_include_directories(de_level_tomb PRIVATE include external/psudb-common/cpp/include)
+ target_include_directories(de_level_tomb PRIVATE include external/ctpl external/psudb-common/cpp/include external)
add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp)
target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread)
diff --git a/external/ctpl b/external/ctpl
new file mode 160000
+Subproject 437e135dbd94eb65b45533d9ce8ee28b5bd37b6
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 91a72b3..1521eb6 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -24,20 +24,26 @@
#include "framework/structure/ExtensionStructure.h"
#include "framework/scheduling/Task.h"
+#include "ctpl/ctpl.h"
#include "psu-ds/LockedPriorityQueue.h"
namespace de {
+
class FIFOScheduler {
+private:
+ static const size_t DEFAULT_MAX_THREADS = 8;
+
public:
FIFOScheduler(size_t memory_budget, size_t thread_cnt)
: m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
- , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS)
, m_used_memory(0)
, m_used_thrds(0)
, m_shutdown(false)
{
m_sched_thrd = std::thread(&FIFOScheduler::run, this);
+ m_thrd_pool.resize(m_thrd_cnt);
}
~FIFOScheduler() {
@@ -72,6 +78,7 @@ private:
std::condition_variable m_cv;
std::thread m_sched_thrd;
+ ctpl::thread_pool m_thrd_pool;
std::atomic<size_t> m_used_thrds;
std::atomic<size_t> m_used_memory;
@@ -79,7 +86,7 @@ private:
void schedule_next() {
assert(m_task_queue.size() > 0);
auto t = m_task_queue.pop();
- t();
+ m_thrd_pool.push(t);
}
void run() {
@@ -87,7 +94,7 @@ private:
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) {
+ while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
schedule_next();
}
} while(!m_shutdown.load());
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index 9c767e8..93611d1 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -47,7 +47,7 @@ public:
void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
size_t ts = m_counter++;
auto t = Task(size, ts, job, args);
- t();
+ t(0);
}
void shutdown() {
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 228665f..6dfd7df 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -54,7 +54,7 @@ struct Task {
return self.m_timestamp > other.m_timestamp;
}
- void operator()() {
+ void operator()(size_t thrd_id) {
m_job(m_args);
}
};
diff --git a/tests/de_level_tomb.cpp b/tests/de_level_tomb.cpp
index 264b5b0..ba3f784 100644
--- a/tests/de_level_tomb.cpp
+++ b/tests/de_level_tomb.cpp
@@ -20,6 +20,6 @@
#include <check.h>
using namespace de;
-typedef DynamicExtension<WRec, WIRS<WRec>, WIRSQuery<WRec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+typedef DynamicExtension<WRec, WIRS<WRec>, WIRSQuery<WRec>, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
#include "dynamic_extension_tests.inc"
diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc
index 2f82e92..df88d98 100644
--- a/tests/dynamic_extension_tests.inc
+++ b/tests/dynamic_extension_tests.inc
@@ -77,6 +77,8 @@ START_TEST(t_insert_with_mem_merges)
val++;
}
+ ext_wirs->await_next_epoch();
+
ck_assert_int_eq(ext_wirs->get_record_count(), 300);
ck_assert_int_eq(ext_wirs->get_height(), 1);
@@ -197,6 +199,9 @@ START_TEST(t_range_sample_weighted)
WRec r = {keys[i], (uint32_t) i, weight};
ext_wirs->insert(r);
}
+
+ ext_wirs->await_next_epoch();
+
size_t k = 1000;
uint64_t lower_key = 0;
uint64_t upper_key = 5;
@@ -277,6 +282,8 @@ START_TEST(t_tombstone_merging_01)
ck_assert(ext_wirs->validate_tombstone_proportion());
}
+ ext_wirs->await_next_epoch();
+
ck_assert(ext_wirs->validate_tombstone_proportion());
gsl_rng_free(rng);