summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitmodules3
-rw-r--r--CMakeLists.txt10
m---------external/ctpl0
-rw-r--r--include/framework/DynamicExtension.h77
-rw-r--r--include/framework/scheduling/Epoch.h45
-rw-r--r--include/framework/scheduling/FIFOScheduler.h13
-rw-r--r--include/framework/scheduling/SerialScheduler.h2
-rw-r--r--include/framework/scheduling/Task.h2
-rw-r--r--include/framework/structure/InternalLevel.h81
-rw-r--r--include/framework/structure/MutableBuffer.h17
-rw-r--r--tests/dynamic_extension_tests.inc6
11 files changed, 170 insertions, 86 deletions
diff --git a/.gitmodules b/.gitmodules
index 7616a12..0b195bd 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -16,3 +16,6 @@
[submodule "external/psudb-common"]
path = external/psudb-common
url = git@github.com:PSU-Database-Systems-Group/psudb-common
+[submodule "external/ctpl"]
+ path = external/ctpl
+ url = git@github.com:vit-vit/CTPL.git
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c4f28e0..d0e14c1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12,7 +12,7 @@ set(bench false)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin")
-add_compile_options(-Iinclude -Iexternal/PLEX/include)
+add_compile_options(-Iinclude -Iexternal/PLEX/include -Iexternal)
if (debug)
add_compile_options(-g -O0)
@@ -47,19 +47,19 @@ if (tests)
add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp)
target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread)
- target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include)
+ target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external)
add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp)
target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread)
- target_include_directories(de_tier_tomb PRIVATE include external/psudb-common/cpp/include)
+ target_include_directories(de_tier_tomb PRIVATE include external/psudb-common/cpp/include external)
add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread)
- target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include)
+ target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp)
target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread)
- target_include_directories(de_level_tomb PRIVATE include external/psudb-common/cpp/include)
+ target_include_directories(de_level_tomb PRIVATE include external/ctpl external/psudb-common/cpp/include external)
add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp)
target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread)
diff --git a/external/ctpl b/external/ctpl
new file mode 160000
+Subproject 437e135dbd94eb65b45533d9ce8ee28b5bd37b6
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 0858fc3..233bebb 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -198,8 +198,8 @@ public:
*/
void await_next_epoch() {
while (m_current_epoch.load() != m_newest_epoch.load()) {
- std::unique_lock<std::mutex> m_epoch_cv_lk;
- m_epoch_cv.wait(m_epoch_cv_lk);
+ std::unique_lock<std::mutex> lk(m_epoch_cv_lk);
+ m_epoch_cv.wait(lk);
}
return;
@@ -238,14 +238,26 @@ private:
}
_Epoch *get_active_epoch_protected() {
- m_epochs[m_current_epoch.load()]->start_job();
- return m_epochs[m_current_epoch.load()];
+ ssize_t cur_epoch = -1;
+ do {
+ if (cur_epoch != -1) {
+ m_epochs[cur_epoch]->end_job();
+ }
+
+ cur_epoch = m_current_epoch.load();
+ m_epochs[cur_epoch]->start_job();
+ } while (cur_epoch != m_current_epoch.load());
+
+ return m_epochs[cur_epoch];
}
void advance_epoch() {
size_t new_epoch_num = m_newest_epoch.load();
+ size_t old_epoch_num = m_current_epoch.load();
+ assert(new_epoch_num != old_epoch_num);
+
_Epoch *new_epoch = m_epochs[new_epoch_num];
- _Epoch *old_epoch = m_epochs[m_current_epoch.load()];
+ _Epoch *old_epoch = m_epochs[old_epoch_num];
/*
* Update the new Epoch to contain the buffers from the old one
@@ -255,7 +267,11 @@ private:
*/
if constexpr (!std::same_as<SCHED, SerialScheduler>) {
size_t old_buffer_cnt = new_epoch->clear_buffers();
- for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
+ // FIXME: this is getting nightmarish... The -1 here is to ensure that the
+ // the empty buffer added when the merge was first triggered is also included.
+ // Due to the reordering of operations in internal_append, the new buffer exists
+ // at the time of the clone, and so is already in the new epoch.
+ for (size_t i=old_buffer_cnt-1; i<old_epoch->get_buffers().size(); i++) {
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
}
@@ -298,14 +314,22 @@ private:
* buffer while a new epoch is being created in the background. Returns a
* pointer to the newly created buffer.
*/
- Buffer *add_empty_buffer(_Epoch *epoch) {
- auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
+ Buffer *add_empty_buffer(_Epoch *epoch, Buffer *current_buffer=nullptr) {
+ auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
std::unique_lock<std::mutex> m_struct_lock;
+ auto new_buffer = epoch->add_buffer(temp_buffer, current_buffer);
+ /*
+ * if epoch->add_buffer doesn't add the new buffer, this insert
+ * won't update the buffer set (duplicate insert)
+ */
m_buffers.insert(new_buffer);
m_struct_lock.release();
- epoch->add_buffer(new_buffer);
+ if (new_buffer != temp_buffer) {
+ delete temp_buffer;
+ }
+
return new_buffer;
}
@@ -448,24 +472,41 @@ private:
int internal_append(const R &rec, bool ts) {
Buffer *buffer = nullptr;
+ int res = 0;
do {
- // FIXME: figure out best way to protect this epoch access
- auto epoch = get_active_epoch();
+ auto epoch = get_active_epoch_protected();
buffer = epoch->get_active_buffer();
+ assert(buffer);
- /* if the buffer is full, schedule a merge and add a new empty buffer */
+ /*
+ * If the buffer is full and there is no current merge,
+ * schedule a merge and add a new empty buffer. If there
+ * is a current merge, then just add a new empty buffer
+ * to the current epoch.
+ */
if (buffer->is_full()) {
- // FIXME: possible race here--two identical merges could be scheduled
- auto vers = epoch->get_structure();
- schedule_merge();
-
if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ /* single threaded: run merge and then empty buffer */
+ epoch->end_job();
+ schedule_merge();
buffer->truncate();
- } else {
+ continue;
+ } else if (epoch->prepare_merge()) {
+ /*
+ * add an empty buffer to allow insert proceed and
+ * schedule a merge on a background thread
+ */
buffer = add_empty_buffer(epoch);
+ schedule_merge();
+ } else {
+ /* background merge is ongoing, so just add empty buffer */
+ buffer = add_empty_buffer(epoch, buffer);
}
}
- } while(!buffer->append(rec, ts));
+
+ res = buffer->append(rec, ts);
+ epoch->end_job();
+ } while(!res);
/* internal append should always succeed, eventually */
return 1;
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index f4aefe9..0ebbde9 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -25,6 +25,7 @@ public:
Epoch(size_t number=0)
: m_buffers()
, m_structure(nullptr)
+ , m_active_merge(false)
, m_active_jobs(0)
, m_active(true)
, m_epoch_number(number)
@@ -34,6 +35,7 @@ public:
: m_buffers()
, m_structure(structure)
, m_active_jobs(0)
+ , m_active_merge(false)
, m_active(true)
, m_epoch_number(number)
{
@@ -54,11 +56,25 @@ public:
}
}
- void add_buffer(Buffer *buf) {
+ Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) {
assert(buf);
+ /*
+ * if a current buffer is specified, only add the
+ * new buffer if the active buffer is the current,
+ * otherwise just return the active buffer (poor man's
+ * CAS).
+ */
+ if (cur_buf) {
+ auto active_buf = get_active_buffer();
+ if (active_buf != cur_buf) {
+ return active_buf;
+ }
+ }
+
buf->take_reference();
m_buffers.push_back(buf);
+ return buf;
}
void start_job() {
@@ -137,6 +153,31 @@ public:
return epoch;
}
+ /*
+ * Check if a merge can be started from this Epoch.
+ * At present, without concurrent merging, this simply
+ * checks if there is currently a scheduled merge based
+ * on this Epoch. If there is, returns false. If there
+ * isn't, return true and set a flag indicating that
+ * there is an active merge.
+ */
+ bool prepare_merge() {
+ auto old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+
+ // FIXME: this needs cleaned up
+ while (!m_active_merge.compare_exchange_strong(old, true)) {
+ old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
void set_inactive() {
m_active = false;
}
@@ -170,6 +211,8 @@ private:
std::condition_variable m_active_cv;
std::mutex m_cv_lock;
+ std::atomic<bool> m_active_merge;
+
/*
* The number of currently active jobs
* (queries/merges) operating on this
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 91a72b3..1521eb6 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -24,20 +24,26 @@
#include "framework/structure/ExtensionStructure.h"
#include "framework/scheduling/Task.h"
+#include "ctpl/ctpl.h"
#include "psu-ds/LockedPriorityQueue.h"
namespace de {
+
class FIFOScheduler {
+private:
+ static const size_t DEFAULT_MAX_THREADS = 8;
+
public:
FIFOScheduler(size_t memory_budget, size_t thread_cnt)
: m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
- , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS)
, m_used_memory(0)
, m_used_thrds(0)
, m_shutdown(false)
{
m_sched_thrd = std::thread(&FIFOScheduler::run, this);
+ m_thrd_pool.resize(m_thrd_cnt);
}
~FIFOScheduler() {
@@ -72,6 +78,7 @@ private:
std::condition_variable m_cv;
std::thread m_sched_thrd;
+ ctpl::thread_pool m_thrd_pool;
std::atomic<size_t> m_used_thrds;
std::atomic<size_t> m_used_memory;
@@ -79,7 +86,7 @@ private:
void schedule_next() {
assert(m_task_queue.size() > 0);
auto t = m_task_queue.pop();
- t();
+ m_thrd_pool.push(t);
}
void run() {
@@ -87,7 +94,7 @@ private:
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_task_queue.size() > 0 && m_used_thrds.load() < m_thrd_cnt) {
+ while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
schedule_next();
}
} while(!m_shutdown.load());
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index 9c767e8..93611d1 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -47,7 +47,7 @@ public:
void schedule_job(std::function<void(void*)> job, size_t size, void *args) {
size_t ts = m_counter++;
auto t = Task(size, ts, job, args);
- t();
+ t(0);
}
void shutdown() {
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 228665f..6dfd7df 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -54,7 +54,7 @@ struct Task {
return self.m_timestamp > other.m_timestamp;
}
- void operator()() {
+ void operator()(size_t thrd_id) {
m_job(m_args);
}
};
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 7a7b98c..632fe17 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -33,32 +33,10 @@ public:
: m_level_no(level_no)
, m_shard_cnt(0)
, m_shards(shard_cap, nullptr)
- , m_owns(shard_cap, true)
, m_pending_shard(nullptr)
{}
- // Create a new memory level sharing the shards and repurposing it as previous level_no + 1
- // WARNING: for leveling only.
- InternalLevel(InternalLevel* level)
- : m_level_no(level->m_level_no + 1)
- , m_shard_cnt(level->m_shard_cnt)
- , m_shards(level->m_shards.size(), nullptr)
- , m_owns(level->m_owns.size(), true)
- , m_pending_shard(nullptr)
- {
- assert(m_shard_cnt == 1 && m_shards.size() == 1);
-
- for (size_t i=0; i<m_shards.size(); i++) {
- level->m_owns[i] = false;
- m_shards[i] = level->m_shards[i];
- }
- }
-
~InternalLevel() {
- for (size_t i=0; i<m_shards.size(); i++) {
- if (m_owns[i]) delete m_shards[i];
- }
-
delete m_pending_shard;
}
@@ -69,10 +47,10 @@ public:
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
Shard* shards[2];
- shards[0] = base_level->m_shards[0];
- shards[1] = new_level->m_shards[0];
+ shards[0] = base_level->m_shards[0].get();
+ shards[1] = new_level->m_shards[0].get();
- res->m_shards[0] = new S(shards, 2);
+ res->m_shards[0] = std::make_shared<S>(shards, 2);
return std::shared_ptr<InternalLevel>(res);
}
@@ -83,19 +61,23 @@ public:
return;
}
- m_shards[m_shard_cnt] = new S(buffer);
- m_owns[m_shard_cnt] = true;
+ m_shards[m_shard_cnt] = std::make_shared<S>(buffer);
++m_shard_cnt;
}
void append_merged_shards(InternalLevel* level) {
+ Shard *shards[level->m_shard_cnt];
+ for (size_t i=0; i<level->m_shard_cnt; i++) {
+ shards[i] = level->m_shards[i].get();
+ }
+
if (m_shard_cnt == m_shards.size()) {
- m_pending_shard = new S(level->m_shards.data(), level->m_shard_cnt);
+ m_pending_shard = new S(shards, level->m_shard_cnt);
return;
}
- m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt);
- m_owns[m_shard_cnt] = true;
+ auto tmp = new S(shards, level->m_shard_cnt);
+ m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp);
++m_shard_cnt;
}
@@ -104,15 +86,10 @@ public:
void finalize() {
if (m_pending_shard) {
for (size_t i=0; i<m_shards.size(); i++) {
- if (m_owns[i]) {
- delete m_shards[i];
- m_shards[i] = nullptr;
- m_owns[i] = false;
- }
+ m_shards[i] = nullptr;
}
- m_shards[0] = m_pending_shard;
- m_owns[0] = true;
+ m_shards[0] = std::shared_ptr<S>(m_pending_shard);
m_pending_shard = nullptr;
m_shard_cnt = 1;
}
@@ -126,7 +103,7 @@ public:
Shard *shards[m_shard_cnt];
for (size_t i=0; i<m_shard_cnt; i++) {
- shards[i] = m_shards[i];
+ shards[i] = m_shards[i].get();
}
return new S(shards, m_shard_cnt);
@@ -136,8 +113,8 @@ public:
void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
for (size_t i=0; i<m_shard_cnt; i++) {
if (m_shards[i]) {
- auto shard_state = Q::get_query_state(m_shards[i], query_parms);
- shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]});
+ auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms);
+ shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()});
shard_states.emplace_back(shard_state);
}
}
@@ -174,7 +151,7 @@ public:
}
Shard* get_shard(size_t idx) {
- return m_shards[idx];
+ return m_shards[idx].get();
}
size_t get_shard_count() {
@@ -184,7 +161,9 @@ public:
size_t get_record_count() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += m_shards[i]->get_record_count();
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_record_count();
+ }
}
return cnt;
@@ -193,7 +172,9 @@ public:
size_t get_tombstone_count() {
size_t res = 0;
for (size_t i = 0; i < m_shard_cnt; ++i) {
- res += m_shards[i]->get_tombstone_count();
+ if (m_shards[i]) {
+ res += m_shards[i]->get_tombstone_count();
+ }
}
return res;
}
@@ -201,7 +182,9 @@ public:
size_t get_aux_memory_usage() {
size_t cnt = 0;
for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += m_shards[i]->get_aux_memory_usage();
+ if (m_shards[i]){
+ cnt += m_shards[i]->get_aux_memory_usage();
+ }
}
return cnt;
@@ -224,7 +207,7 @@ public:
for (size_t i=0; i<m_shard_cnt; i++) {
if (m_shards[i]) {
tscnt += m_shards[i]->get_tombstone_count();
- reccnt += (*m_shards[i])->get_record_count();
+ reccnt += m_shards[i]->get_record_count();
}
}
@@ -235,8 +218,6 @@ public:
auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
for (size_t i=0; i<m_shard_cnt; i++) {
new_level->m_shards[i] = m_shards[i];
- new_level->m_owns[i] = true;
- m_owns[i] = false;
}
return new_level;
@@ -248,12 +229,8 @@ private:
size_t m_shard_cnt;
size_t m_shard_size_cap;
- std::vector<Shard*> m_shards;
-
+ std::vector<std::shared_ptr<Shard>> m_shards;
Shard *m_pending_shard;
-
- std::vector<bool> m_owns;
-
};
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index a70b86b..ba25cc3 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -33,7 +33,7 @@ class MutableBuffer {
public:
MutableBuffer(size_t capacity, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
- , m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
+ , m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) {
m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_tombstone_filter = nullptr;
@@ -83,6 +83,7 @@ public:
m_weight.fetch_add(1);
}
+ m_reccnt.fetch_add(1);
return 1;
}
@@ -91,6 +92,7 @@ public:
m_reccnt.store(0);
m_weight.store(0);
m_max_weight.store(0);
+ m_tail.store(0);
if (m_tombstone_filter) m_tombstone_filter->clear();
return true;
@@ -193,11 +195,15 @@ public:
}
private:
- int32_t try_advance_tail() {
- size_t new_tail = m_reccnt.fetch_add(1);
+ int64_t try_advance_tail() {
+ int64_t new_tail = m_tail.fetch_add(1);
- if (new_tail < m_cap) return new_tail;
- else return -1;
+ if (new_tail < m_cap) {
+ return new_tail;
+ }
+
+ m_tail.fetch_add(-1);
+ return -1;
}
size_t m_cap;
@@ -210,6 +216,7 @@ private:
alignas(64) std::atomic<size_t> m_tombstonecnt;
alignas(64) std::atomic<uint32_t> m_reccnt;
+ alignas(64) std::atomic<int64_t> m_tail;
alignas(64) std::atomic<double> m_weight;
alignas(64) std::atomic<double> m_max_weight;
diff --git a/tests/dynamic_extension_tests.inc b/tests/dynamic_extension_tests.inc
index aa12e31..bcb5ae6 100644
--- a/tests/dynamic_extension_tests.inc
+++ b/tests/dynamic_extension_tests.inc
@@ -76,6 +76,8 @@ START_TEST(t_insert_with_mem_merges)
val++;
}
+ test_de->await_next_epoch();
+
ck_assert_int_eq(test_de->get_record_count(), 300);
ck_assert_int_eq(test_de->get_height(), 1);
@@ -174,6 +176,8 @@ START_TEST(t_range_query)
ck_assert_int_eq(test_de->insert(r), 1);
}
+ test_de->await_next_epoch();
+
std::sort(keys.begin(), keys.end());
auto idx = rand() % (keys.size() - 250);
@@ -245,6 +249,8 @@ START_TEST(t_tombstone_merging_01)
ck_assert(test_de->validate_tombstone_proportion());
}
+ test_de->await_next_epoch();
+
ck_assert(test_de->validate_tombstone_proportion());
gsl_rng_free(rng);