summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-30 17:15:05 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-30 17:15:05 -0400
commitd2279e1b96d352a0af1d425dcaaf93e8a26a8d52 (patch)
tree4e8df98339ff5578deb8f8be46b9f6c3cc34cef4 /include/framework/DynamicExtension.h
parent8ce1cb0eef7d5631f0f7788804845ddc8296ac6f (diff)
downloaddynamic-extension-d2279e1b96d352a0af1d425dcaaf93e8a26a8d52.tar.gz
General Comment + Consistency updates
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h46
1 files changed, 27 insertions, 19 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index f2bbacc..9129060 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -1,7 +1,7 @@
/*
* include/framework/DynamicExtension.h
*
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* All rights reserved. Published under the Modified BSD License.
@@ -107,8 +107,7 @@ public:
}
size_t get_record_count() {
- auto epoch = get_active_epoch();
- epoch->start_job();
+ auto epoch = get_active_epoch_protected();
auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count();
epoch->end_job();
@@ -116,8 +115,7 @@ public:
}
size_t get_tombstone_count() {
- auto epoch = get_active_epoch();
- epoch->start_job();
+ auto epoch = get_active_epoch_protected();
auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
epoch->end_job();
@@ -129,8 +127,7 @@ public:
}
size_t get_memory_usage() {
- auto epoch = get_active_epoch();
- epoch->start_job();
+ auto epoch = get_active_epoch_protected();
auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage();
epoch->end_job();
@@ -138,8 +135,7 @@ public:
}
size_t get_aux_memory_usage() {
- auto epoch = get_active_epoch();
- epoch->start_job();
+ auto epoch = get_active_epoch_protected();
auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
epoch->end_job();
@@ -151,9 +147,8 @@ public:
}
Shard *create_static_structure() {
- auto epoch = get_active_epoch();
+ auto epoch = get_active_epoch_protected();
auto bv = epoch->get_buffer_view();
- epoch->start_job();
auto vers = epoch->get_structure();
std::vector<Shard *> shards;
@@ -219,6 +214,11 @@ private:
return m_epochs[m_current_epoch.load()];
}
+ _Epoch *get_active_epoch_protected() {
+ m_epochs[m_current_epoch.load()]->start_job();
+ return m_epochs[m_current_epoch.load()];
+ }
+
void advance_epoch() {
size_t new_epoch_num = m_current_epoch.load() + 1;
_Epoch *new_epoch = m_epochs[new_epoch_num];
@@ -241,6 +241,12 @@ private:
* structure will be a shallow copy of the old one's.
*/
_Epoch *create_new_epoch() {
+ /*
+ * This epoch access is _not_ protected under the assumption that
+ * only one merge will be able to trigger at a time. If that condition
+ * is violated, it is possible that this code will clone a retired
+ * epoch.
+ */
auto new_epoch = get_active_epoch()->clone();
std::unique_lock<std::mutex> m_struct_lock;
m_versions.insert(new_epoch->get_structure());
@@ -311,6 +317,8 @@ private:
MergeArgs<R, S, Q, L> *args = (MergeArgs<R, S, Q, L> *) arguments;
Structure *vers = args->epoch->get_structure();
+ // FIXME: with an improved shard interface, multiple full buffers
+ // could be merged at once here.
Buffer *buff = (Buffer *) args->epoch->get_buffers()[0];
for (ssize_t i=args->merges.size() - 1; i>=0; i--) {
@@ -387,24 +395,23 @@ private:
}
std::future<std::vector<R>> schedule_query(void *query_parms) {
- auto epoch = get_active_epoch();
- epoch->start_job();
+ auto epoch = get_active_epoch_protected();
QueryArgs<R, S, Q, L> *args = new QueryArgs<R, S, Q, L>();
args->epoch = epoch;
args->query_parms = query_parms;
+ auto result = args->result_set.get_future();
+
m_sched.schedule_job(async_query, 0, args);
- return args->result_set.get_future();
+ return result;
}
int internal_append(const R &rec, bool ts) {
Buffer *buffer = nullptr;
do {
- auto epoch = get_active_epoch();
-
- while (!(buffer = epoch->get_active_buffer()))
- ;
+ auto epoch = get_active_epoch_protected();
+ buffer = epoch->get_active_buffer();
/* if the buffer is full, schedule a merge and add a new empty buffer */
if (buffer->is_full()) {
@@ -413,7 +420,8 @@ private:
schedule_merge();
buffer = add_empty_buffer(epoch);
}
-
+ // FIXME: not exactly the best spot for this
+ epoch->end_job();
} while(!buffer->append(rec, ts));
/* internal append should always succeed, eventually */