summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-10-30 14:17:59 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-10-30 14:17:59 -0400
commit39ae3e0441d8297a09197aba98bd494b5ada12c1 (patch)
tree3bd5c8053ef17188ece2f1839d443df98875939f /include/framework/DynamicExtension.h
parent3afacb7702e6d8fa67749a2a41dc776d315e02a9 (diff)
downloaddynamic-extension-39ae3e0441d8297a09197aba98bd494b5ada12c1.tar.gz
Concurrency updates + fixes for compile errors
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h161
1 files changed, 81 insertions, 80 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6936247..d2a6b7a 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -39,7 +39,7 @@ class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
typedef ExtensionStructure<R, S, Q, L> Structure;
- typedef Epoch<R, S, Q, L> Epoch;
+ typedef Epoch<R, S, Q, L> _Epoch;
typedef BufferView<R, Q> BufView;
public:
@@ -53,20 +53,24 @@ public:
{
auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop);
- auto epoch = new Epoch(vers, buf);
+ auto epoch = new _Epoch(vers, buf);
- m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap));
- m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop));
- m_epochs.push_back({0, epoch});
+ m_buffers.insert(new Buffer(buffer_cap, max_delete_prop*buffer_cap));
+ m_versions.insert(new Structure(buffer_cap, scale_factor, max_delete_prop));
+ m_epochs.insert({0, epoch});
}
~DynamicExtension() {
- for (size_t i=0; i<m_buffers.size(); i++) {
- delete m_buffers[i];
+ for (auto e : m_buffers) {
+ delete e;
}
- for (size_t i=0; i<m_versions.size(); i++) {
- delete m_versions[i];
+ for (auto e : m_versions) {
+ delete e;
+ }
+
+ for (auto e : m_epochs) {
+ delete e.second;
}
}
@@ -87,7 +91,7 @@ public:
* probably has the lowest probability of having the record,
* so we'll check it last.
*/
- return buffers->delete_record(rec);
+ return buffers.delete_record(rec);
}
/*
@@ -97,7 +101,7 @@ public:
}
std::future<std::vector<R>> query(void *parms) {
- return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms);
+ return schedule_query(parms);
}
size_t get_record_count() {
@@ -183,36 +187,37 @@ private:
std::set<Structure *> m_versions;
std::atomic<size_t> m_current_epoch;
- std::unordered_map<size_t, Epoch *> m_epochs;
+ std::unordered_map<size_t, _Epoch *> m_epochs;
size_t m_scale_factor;
double m_max_delete_prop;
size_t m_buffer_capacity;
size_t m_buffer_delete_capacity;
- Epoch *get_active_epoch() {
+ _Epoch *get_active_epoch() {
return m_epochs[m_current_epoch.load()];
}
void advance_epoch() {
size_t new_epoch_num = m_current_epoch.load() + 1;
- Epoch *new_epoch = m_epochs[new_epoch_num];
- Epoch *old_epoch = m_epochs[m_current_epoch.load()];
+ _Epoch *new_epoch = m_epochs[new_epoch_num];
+ _Epoch *old_epoch = m_epochs[m_current_epoch.load()];
// Update the new Epoch to contain the buffers
// from the old one that it doesn't currently have
size_t old_buffer_cnt = new_epoch->clear_buffers();
for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
- new_epoch->add_buffer(old_epoch->get_buffers[i]);
+ new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
m_current_epoch.fetch_add(1);
+ retire_epoch(old_epoch);
}
/*
* Creates a new epoch by copying the currently active one. The new epoch's
* structure will be a shallow copy of the old one's.
*/
- Epoch *create_new_epoch() {
+ _Epoch *create_new_epoch() {
auto new_epoch = get_active_epoch()->clone();
std::unique_lock<std::mutex> m_struct_lock;
m_versions.insert(new_epoch->get_structure());
@@ -228,8 +233,8 @@ private:
* buffer while a new epoch is being created in the background. Returns a
* pointer to the newly created buffer.
*/
- Buffer *add_empty_buffer(Epoch *epoch) {
- auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity);
+ Buffer *add_empty_buffer(_Epoch *epoch) {
+ auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
std::unique_lock<std::mutex> m_struct_lock;
m_buffers.insert(new_buffer);
@@ -239,7 +244,7 @@ private:
return new_buffer;
}
- void retire_epoch(Epoch *epoch) {
+ void retire_epoch(_Epoch *epoch) {
/*
* Epochs with currently active jobs cannot
* be retired. By the time retire_epoch is called,
@@ -294,26 +299,15 @@ private:
vers->merge_buffer(buff);
- args->result.set_value(true);
args->epoch->end_job();
- delete args;
- }
-
- static std::vector<R> finalize_query_result(std::vector<std::vector<Wrapped<R>>> &query_results, void *parms,
- std::vector<void *> &shard_states, std::vector<void *> &buffer_states) {
- auto result = Q::merge(query_results, parms);
-
- for (size_t i=0; i<buffer_states.size(); i++) {
- Q::delete_buffer_query_state(buffer_states[i]);
- }
-
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(shard_states[i]);
- }
+ args->result.set_value(true);
- return result;
+ ((DynamicExtension *) args->extension)->advance_epoch();
+
+ // FIXME: this might break things... not sure
+ delete args;
}
-
+
static void async_query(void *arguments) {
QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments;
@@ -322,58 +316,56 @@ private:
void *parms = args->query_parms;
// Get the buffer query states
- std::vector<void *> buffer_states = buffers->get_buffer_query_states(parms);
+ std::vector<void *> buffer_states = buffers.get_query_states(parms);
// Get the shard query states
std::vector<std::pair<ShardID, Shard*>> shards;
- std::vector<void *> shard_states = vers->get_query_states(shards, parms);
+ std::vector<void *> states = vers->get_query_states(shards, parms);
- Q::process_query_states(parms, shard_states, buffer_states);
+ Q::process_query_states(parms, states, buffer_states);
std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size());
+ for (size_t i=0; i<query_results.size(); i++) {
+ std::vector<Wrapped<R>> local_results = (i < buffer_states.size())
+ ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms)
+ : Q::query(shards[i - buffer_states.size()].second,
+ states[i - buffer_states.size()], parms);
+ ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first;
+ query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers));
- // Execute the query for the buffer
- std::vector<std::vector<Wrapped<R>>> buffer_results(buffer_states.size());
- for (size_t i=0; i<buffer_states.size(); i++) {
- auto buffer_results = Q::buffer_query(buffers->get_buffers[i], buffer_states[i], parms);
- query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers));
-
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i] > 0) {
- return finalize_query_result(query_results, parms, buffer_states, shard_states);
- }
- }
- }
-
- // Execute the query for each shard
- for (size_t i=0; i<shards.size(); i++) {
- auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+buffer_states.size()] = std::move(filter_deletes(shard_results, shards[i].first, buffers, vers));
if constexpr (Q::EARLY_ABORT) {
- if (query_results[i+buffer_states.size()].size() > 0) {
- return finalize_query_result(query_results, parms, buffer_states, shard_states);
- }
+ if (query_results[i].size() > 0) break;
}
}
-
- // Merge the results together and finalize the job
- auto result = finalize_query_result(query_results, parms, buffer_states, shard_states);
+
+ auto result = Q::merge(query_results, parms);
args->result_set.set_value(std::move(result));
args->epoch->end_job();
+
+ for (size_t i=0; i<buffer_states.size(); i++) {
+ Q::delete_buffer_query_state(buffer_states[i]);
+ }
+
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ // FIXME: this might break things... not sure
delete args;
}
- std::future<bool> schedule_merge() {
- auto epoch = get_active_epoch();
+ void schedule_merge() {
+ auto epoch = create_new_epoch();
epoch->start_job();
MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]);
+ // FIXME: all full buffers can be merged at this point--but that requires
+ // retooling the shard interface a bit to do efficiently.
+ args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count());
+ args->extension = this;
m_sched.schedule_job(merge, 0, args);
-
- return args->result.get_future();
}
std::future<std::vector<R>> schedule_query(void *query_parms) {
@@ -389,20 +381,29 @@ private:
}
int internal_append(const R &rec, bool ts) {
- Buffer *buffer;
- while (!(buffer = get_active_epoch()->get_active_buffer()))
- ;
-
- if (buffer->is_full()) {
- auto vers = get_active_epoch()->get_structure();
- auto res = schedule_merge(vers, buffer);
- res.get();
- }
+ Buffer *buffer = nullptr;
+ do {
+ auto epoch = get_active_epoch();
+
+ while (!(buffer = epoch->get_active_buffer()))
+ ;
+
+ /* if the buffer is full, schedule a merge and add a new empty buffer */
+ if (buffer->is_full()) {
+ // FIXME: possible race here--two identical merges could be scheduled
+ auto vers = epoch->get_structure();
+ schedule_merge();
+ buffer = add_empty_buffer(epoch);
+ }
+
+ } while(!buffer->append(rec, ts));
- return buffer->append(rec, ts);
+ /* internal append should always succeed, eventually */
+ return 1;
}
- static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, BufView *buffers, Structure *vers) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid,
+ BufView &buffers, Structure *vers) {
if constexpr (!Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -431,7 +432,7 @@ private:
continue;
}
- if (buffers->check_tombstone(rec.rec)) {
+ if (buffers.check_tombstone(rec.rec)) {
continue;
}