summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-31 18:41:17 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-31 18:41:17 -0500
commitf3b7428cfa7f9364c5a8bc85107db3a7cccd53bc (patch)
treec6025935175e8c63e5dc1a116f7e605ca74885ce /include/framework/DynamicExtension.h
parent8fbcfda7270ef266f29f36b8eadcffaec2123612 (diff)
downloaddynamic-extension-f3b7428cfa7f9364c5a8bc85107db3a7cccd53bc.tar.gz
Adjusted epoch transition methodology
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h215
1 files changed, 119 insertions, 96 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index cc226d2..0992e14 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -43,6 +43,10 @@ class DynamicExtension {
static constexpr size_t QUERY = 1;
static constexpr size_t RECONSTRUCTION = 2;
+ struct epoch_ptr {
+ _Epoch *epoch;
+ size_t refcnt;
+ };
public:
DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0,
@@ -53,12 +57,14 @@ public:
, m_buffer(new Buffer(buffer_lwm, buffer_hwm))
, m_core_cnt(thread_cnt)
, m_next_core(0)
+ , m_epoch_cnt(0)
{
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
- auto epoch = new _Epoch(0, vers, m_buffer, 0);
+ m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
+ m_previous_epoch.store({nullptr, 0});
+ m_next_epoch.store({nullptr, 0});
m_versions.insert(vers);
- m_epochs.insert({0, epoch});
}
~DynamicExtension() {
@@ -66,16 +72,13 @@ public:
/* let any in-flight epoch transition finish */
await_next_epoch();
- /* deactivate the active epoch */
- get_active_epoch()->set_inactive();
-
/* shutdown the scheduler */
m_sched.shutdown();
/* delete all held resources */
- for (auto e : m_epochs) {
- delete e.second;
- }
+ delete m_next_epoch.load().epoch;
+ delete m_current_epoch.load().epoch;
+ delete m_previous_epoch.load().epoch;
delete m_buffer;
@@ -123,41 +126,41 @@ public:
}
size_t get_record_count() {
- auto epoch = get_active_epoch_protected();
+ auto epoch = get_active_epoch();
auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count();
- epoch->end_job();
+ end_job(epoch);
return t;
}
size_t get_tombstone_count() {
- auto epoch = get_active_epoch_protected();
+ auto epoch = get_active_epoch();
auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
- epoch->end_job();
+ end_job(epoch);
return t;
}
size_t get_height() {
- auto epoch = get_active_epoch_protected();
+ auto epoch = get_active_epoch();
auto t = epoch->get_structure()->get_height();
- epoch->end_job();
+ end_job(epoch);
return t;
}
size_t get_memory_usage() {
- auto epoch = get_active_epoch_protected();
+ auto epoch = get_active_epoch();
auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage();
- epoch->end_job();
+ end_job(epoch);
return t;
}
size_t get_aux_memory_usage() {
- auto epoch = get_active_epoch_protected();
+ auto epoch = get_active_epoch();
auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
- epoch->end_job();
+ end_job(epoch);
return t;
}
@@ -171,7 +174,7 @@ public:
await_next_epoch();
}
- auto epoch = get_active_epoch_protected();
+ auto epoch = get_active_epoch();
auto vers = epoch->get_structure();
std::vector<Shard *> shards;
@@ -203,7 +206,7 @@ public:
delete shard;
}
- epoch->end_job();
+ end_job(epoch);
return flattened;
}
@@ -212,12 +215,10 @@ public:
* the newest one to become available. Otherwise, returns immediately.
*/
void await_next_epoch() {
- while (m_current_epoch.load() != m_newest_epoch.load()) {
+ while (m_next_epoch.load().epoch != nullptr) {
std::unique_lock<std::mutex> lk(m_epoch_cv_lk);
m_epoch_cv.wait(lk);
}
-
- return;
}
/*
@@ -226,9 +227,9 @@ public:
* tombstone proportion invariant.
*/
bool validate_tombstone_proportion() {
- auto epoch = get_active_epoch_protected();
+ auto epoch = get_active_epoch();
auto t = epoch->get_structure()->validate_tombstone_proportion();
- epoch->end_job();
+ end_job(epoch);
return t;
}
@@ -247,15 +248,14 @@ private:
alignas(64) std::atomic<bool> m_reconstruction_scheduled;
- std::atomic<size_t> m_current_epoch;
- std::atomic<size_t> m_newest_epoch;
- std::unordered_map<size_t, _Epoch *> m_epochs;
+ std::atomic<epoch_ptr> m_next_epoch;
+ std::atomic<epoch_ptr> m_current_epoch;
+ std::atomic<epoch_ptr> m_previous_epoch;
std::condition_variable m_epoch_cv;
std::mutex m_epoch_cv_lk;
- std::mutex m_epoch_transition_lk;
- std::shared_mutex m_epoch_retire_lk;
+ std::atomic<size_t> m_epoch_cnt;
size_t m_scale_factor;
double m_max_delete_prop;
@@ -279,12 +279,6 @@ private:
auto wait = args->result.get_future();
- /*
- * the reconstruction process calls end_job(),
- * so we must start one before calling it
- */
- epoch->start_job();
-
m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
/* wait for compaction completion */
@@ -296,39 +290,38 @@ private:
}
_Epoch *get_active_epoch() {
- return m_epochs[m_current_epoch.load()];
- }
+ epoch_ptr old, new_ptr;
- _Epoch *get_active_epoch_protected() {
- m_epoch_retire_lk.lock_shared();
- m_struct_lock.lock();
- auto cur_epoch = m_current_epoch.load();
- m_epochs[cur_epoch]->start_job();
- m_struct_lock.unlock();
- m_epoch_retire_lk.unlock_shared();
+ do {
+ if (m_current_epoch.load().epoch == nullptr) {
+ old = m_previous_epoch;
+ new_ptr = {old.epoch, old.refcnt+1};
+ if (old.epoch != nullptr && m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ } else {
+ old = m_current_epoch;
+ new_ptr = {old.epoch, old.refcnt+1};
+ if (old.epoch != nullptr && m_current_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ }
+ } while (true);
- return m_epochs[cur_epoch];
+ return new_ptr.epoch;
}
void advance_epoch(size_t buffer_head) {
- m_epoch_transition_lk.lock();
-
- size_t new_epoch_num = m_newest_epoch.load();
- size_t old_epoch_num = m_current_epoch.load();
- assert(new_epoch_num != old_epoch_num);
+ retire_epoch(m_previous_epoch.load().epoch);
- _Epoch *new_epoch = m_epochs[new_epoch_num];
- _Epoch *old_epoch = m_epochs[old_epoch_num];
+ epoch_ptr tmp = {nullptr, 0};
+ epoch_ptr cur;
+ do {
+ cur = m_current_epoch;
+ } while(!m_current_epoch.compare_exchange_strong(cur, tmp));
- /*
- * Verify the tombstone invariant within the epoch's structure, this
- * may require scheduling additional reconstructions.
- *
- * FIXME: having this inside the lock is going to TANK
- * insertion performance.
- */
- enforce_delete_invariant(new_epoch);
+ m_previous_epoch.store(cur);
// FIXME: this may currently block because there isn't any
// query preemption yet. At this point, we'd need to either
@@ -336,20 +329,19 @@ private:
// 2) kill all queries on the old_head
// 3) somehow migrate all queries on the old_head to the new
// version
- while (!new_epoch->advance_buffer_head(buffer_head)) {
+ while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) {
_mm_pause();
}
- m_current_epoch.fetch_add(1);
- old_epoch->set_inactive();
- m_epoch_transition_lk.unlock();
+
+ m_current_epoch.store(m_next_epoch);
+ m_next_epoch.store({nullptr, 0});
+
/* notify any blocking threads that the new epoch is available */
m_epoch_cv_lk.lock();
m_epoch_cv.notify_all();
m_epoch_cv_lk.unlock();
-
- retire_epoch(old_epoch);
}
/*
@@ -363,14 +355,20 @@ private:
* is violated, it is possible that this code will clone a retired
* epoch.
*/
- m_newest_epoch.fetch_add(1);
- auto new_epoch = get_active_epoch()->clone(m_newest_epoch.load());
+ assert(m_next_epoch.load().epoch == nullptr);
+ auto current_epoch = get_active_epoch();
+
+ m_epoch_cnt.fetch_add(1);
+ m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0});
+
+ end_job(current_epoch);
+
std::unique_lock<std::mutex> m_struct_lock;
- m_versions.insert(new_epoch->get_structure());
- m_epochs.insert({m_newest_epoch.load(), new_epoch});
+ m_versions.insert(m_next_epoch.load().epoch->get_structure());
m_struct_lock.release();
- return new_epoch;
+
+ return m_next_epoch.load().epoch;
}
void retire_epoch(_Epoch *epoch) {
@@ -384,28 +382,25 @@ private:
* proceed.
*/
+ if (epoch == nullptr) {
+ return;
+ }
+
+ epoch_ptr old, new_ptr;
+ new_ptr = {nullptr, 0};
do {
- if (epoch->retirable()) {
- m_epoch_retire_lk.lock();
- if (!epoch->retirable()) {
- m_epoch_retire_lk.unlock();
- continue;
- }
+ old = m_previous_epoch.load();
+
+ if (old.epoch == epoch && old.refcnt == 0 &&
+ m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
break;
}
- } while (true);
+ usleep(1);
+ } while(true);
- /* remove epoch from the framework's map */
- m_epochs.erase(epoch->get_epoch_number());
+ //fprintf(stderr, "Epoch %ld retired [%p]\n", epoch->get_epoch_number(), epoch);
- /*
- * The epoch's destructor will handle releasing
- * all the references it holds
- */
delete epoch;
- m_epoch_retire_lk.unlock();
-
- /* NOTE: the BufferView mechanism handles freeing unused buffer space */
/*
* Following the epoch's destruction, any buffers
@@ -453,7 +448,6 @@ private:
vers->flush_buffer(std::move(buffer_view));
}
- args->epoch->end_job();
args->result.set_value(true);
/*
@@ -473,8 +467,17 @@ private:
static void async_query(void *arguments) {
QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments;
- auto buffer = args->epoch->get_buffer();
- auto vers = args->epoch->get_structure();
+ auto epoch = ((DynamicExtension *) args->extension)->get_active_epoch();
+
+ auto ptr1 = ((DynamicExtension *) args->extension)->m_previous_epoch.load().epoch;
+ auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch;
+ auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch;
+
+ //fprintf(stderr, "(%ld, %p)\t%p\t%p\t%p\n", epoch->get_epoch_number(), epoch, ptr1, ptr2, ptr3);
+
+
+ auto buffer = epoch->get_buffer();
+ auto vers = epoch->get_structure();
void *parms = args->query_parms;
/* Get the buffer query states */
@@ -509,7 +512,7 @@ private:
auto result = Q::merge(query_results, parms);
args->result_set.set_value(std::move(result));
- args->epoch->end_job();
+ ((DynamicExtension *) args->extension)->end_job(epoch);
Q::delete_buffer_query_state(buffer_state);
for (size_t i=0; i<states.size(); i++) {
@@ -525,7 +528,6 @@ private:
* the reconstruction process calls end_job(),
* so we must start one before calling it
*/
- epoch->start_job();
ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
@@ -538,10 +540,8 @@ private:
}
std::future<std::vector<R>> schedule_query(void *query_parms) {
- auto epoch = get_active_epoch_protected();
-
QueryArgs<R, S, Q, L> *args = new QueryArgs<R, S, Q, L>();
- args->epoch = epoch;
+ args->extension = this;
args->query_parms = query_parms;
auto result = args->result_set.get_future();
@@ -643,6 +643,29 @@ private:
::sched_setaffinity(0, sizeof(mask), &mask);
}
+
+ void end_job(_Epoch *epoch) {
+ epoch_ptr old, new_ptr;
+
+ do {
+ if (m_previous_epoch.load().epoch == epoch) {
+ old = m_previous_epoch;
+ assert(old.refcnt > 0);
+ new_ptr = {old.epoch, old.refcnt - 1};
+ if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ } else {
+ old = m_current_epoch;
+ assert(old.refcnt > 0);
+ new_ptr = {old.epoch, old.refcnt - 1};
+ if (m_current_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ }
+ } while (true);
+ }
+
};
}