diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-30 14:17:59 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-10-30 14:17:59 -0400 |
| commit | 39ae3e0441d8297a09197aba98bd494b5ada12c1 (patch) | |
| tree | 3bd5c8053ef17188ece2f1839d443df98875939f /include/framework/DynamicExtension.h | |
| parent | 3afacb7702e6d8fa67749a2a41dc776d315e02a9 (diff) | |
| download | dynamic-extension-39ae3e0441d8297a09197aba98bd494b5ada12c1.tar.gz | |
Concurrency updates + fixes for compile errors
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 161 |
1 files changed, 81 insertions, 80 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6936247..d2a6b7a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -39,7 +39,7 @@ class DynamicExtension { typedef S Shard; typedef MutableBuffer<R> Buffer; typedef ExtensionStructure<R, S, Q, L> Structure; - typedef Epoch<R, S, Q, L> Epoch; + typedef Epoch<R, S, Q, L> _Epoch; typedef BufferView<R, Q> BufView; public: @@ -53,20 +53,24 @@ public: { auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new Epoch(vers, buf); + auto epoch = new _Epoch(vers, buf); - m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); - m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop)); - m_epochs.push_back({0, epoch}); + m_buffers.insert(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); + m_versions.insert(new Structure(buffer_cap, scale_factor, max_delete_prop)); + m_epochs.insert({0, epoch}); } ~DynamicExtension() { - for (size_t i=0; i<m_buffers.size(); i++) { - delete m_buffers[i]; + for (auto e : m_buffers) { + delete e; } - for (size_t i=0; i<m_versions.size(); i++) { - delete m_versions[i]; + for (auto e : m_versions) { + delete e; + } + + for (auto e : m_epochs) { + delete e.second; } } @@ -87,7 +91,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return buffers->delete_record(rec); + return buffers.delete_record(rec); } /* @@ -97,7 +101,7 @@ public: } std::future<std::vector<R>> query(void *parms) { - return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms); + return schedule_query(parms); } size_t get_record_count() { @@ -183,36 +187,37 @@ private: std::set<Structure *> m_versions; std::atomic<size_t> m_current_epoch; - std::unordered_map<size_t, Epoch *> m_epochs; + std::unordered_map<size_t, _Epoch *> m_epochs; size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; size_t m_buffer_delete_capacity; - Epoch *get_active_epoch() { + _Epoch *get_active_epoch() { return m_epochs[m_current_epoch.load()]; } void advance_epoch() { size_t new_epoch_num = m_current_epoch.load() + 1; - Epoch *new_epoch = m_epochs[new_epoch_num]; - Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + _Epoch *new_epoch = m_epochs[new_epoch_num]; + _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; // Update the new Epoch to contain the buffers // from the old one that it doesn't currently have size_t old_buffer_cnt = new_epoch->clear_buffers(); for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers[i]); + new_epoch->add_buffer(old_epoch->get_buffers()[i]); } m_current_epoch.fetch_add(1); + retire_epoch(old_epoch); } /* * Creates a new epoch by copying the currently active one. The new epoch's * structure will be a shallow copy of the old one's. */ - Epoch *create_new_epoch() { + _Epoch *create_new_epoch() { auto new_epoch = get_active_epoch()->clone(); std::unique_lock<std::mutex> m_struct_lock; m_versions.insert(new_epoch->get_structure()); @@ -228,8 +233,8 @@ private: * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(Epoch *epoch) { - auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity); + Buffer *add_empty_buffer(_Epoch *epoch) { + auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock<std::mutex> m_struct_lock; m_buffers.insert(new_buffer); @@ -239,7 +244,7 @@ private: return new_buffer; } - void retire_epoch(Epoch *epoch) { + void retire_epoch(_Epoch *epoch) { /* * Epochs with currently active jobs cannot * be retired. By the time retire_epoch is called, @@ -294,26 +299,15 @@ private: vers->merge_buffer(buff); - args->result.set_value(true); args->epoch->end_job(); - delete args; - } - - static std::vector<R> finalize_query_result(std::vector<std::vector<Wrapped<R>>> &query_results, void *parms, - std::vector<void *> &shard_states, std::vector<void *> &buffer_states) { - auto result = Q::merge(query_results, parms); - - for (size_t i=0; i<buffer_states.size(); i++) { - Q::delete_buffer_query_state(buffer_states[i]); - } - - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(shard_states[i]); - } + args->result.set_value(true); - return result; + ((DynamicExtension *) args->extension)->advance_epoch(); + + // FIXME: this might break things... not sure + delete args; } - + static void async_query(void *arguments) { QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments; @@ -322,58 +316,56 @@ private: void *parms = args->query_parms; // Get the buffer query states - std::vector<void *> buffer_states = buffers->get_buffer_query_states(parms); + std::vector<void *> buffer_states = buffers.get_query_states(parms); // Get the shard query states std::vector<std::pair<ShardID, Shard*>> shards; - std::vector<void *> shard_states = vers->get_query_states(shards, parms); + std::vector<void *> states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, shard_states, buffer_states); + Q::process_query_states(parms, states, buffer_states); std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size()); + for (size_t i=0; i<query_results.size(); i++) { + std::vector<Wrapped<R>> local_results = (i < buffer_states.size()) + ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms) + : Q::query(shards[i - buffer_states.size()].second, + states[i - buffer_states.size()], parms); + ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first; + query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers)); - // Execute the query for the buffer - std::vector<std::vector<Wrapped<R>>> buffer_results(buffer_states.size()); - for (size_t i=0; i<buffer_states.size(); i++) { - auto buffer_results = Q::buffer_query(buffers->get_buffers[i], buffer_states[i], parms); - query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers)); - - if constexpr (Q::EARLY_ABORT) { - if (query_results[i] > 0) { - return finalize_query_result(query_results, parms, buffer_states, shard_states); - } - } - } - - // Execute the query for each shard - for (size_t i=0; i<shards.size(); i++) { - auto shard_results = Q::query(shards[i].second, states[i], parms); - query_results[i+buffer_states.size()] = std::move(filter_deletes(shard_results, shards[i].first, buffers, vers)); if constexpr (Q::EARLY_ABORT) { - if (query_results[i+buffer_states.size()].size() > 0) { - return finalize_query_result(query_results, parms, buffer_states, shard_states); - } + if (query_results[i].size() > 0) break; } } - - // Merge the results together and finalize the job - auto result = finalize_query_result(query_results, parms, buffer_states, shard_states); + + auto result = Q::merge(query_results, parms); args->result_set.set_value(std::move(result)); args->epoch->end_job(); + + for (size_t i=0; i<buffer_states.size(); i++) { + Q::delete_buffer_query_state(buffer_states[i]); + } + + for (size_t i=0; i<states.size(); i++) { + Q::delete_query_state(states[i]); + } + + // FIXME: this might break things... not sure delete args; } - std::future<bool> schedule_merge() { - auto epoch = get_active_epoch(); + void schedule_merge() { + auto epoch = create_new_epoch(); epoch->start_job(); MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]); + // FIXME: all full buffers can be merged at this point--but that requires + // retooling the shard interface a bit to do efficiently. + args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); + args->extension = this; m_sched.schedule_job(merge, 0, args); - - return args->result.get_future(); } std::future<std::vector<R>> schedule_query(void *query_parms) { @@ -389,20 +381,29 @@ private: } int internal_append(const R &rec, bool ts) { - Buffer *buffer; - while (!(buffer = get_active_epoch()->get_active_buffer())) - ; - - if (buffer->is_full()) { - auto vers = get_active_epoch()->get_structure(); - auto res = schedule_merge(vers, buffer); - res.get(); - } + Buffer *buffer = nullptr; + do { + auto epoch = get_active_epoch(); + + while (!(buffer = epoch->get_active_buffer())) + ; + + /* if the buffer is full, schedule a merge and add a new empty buffer */ + if (buffer->is_full()) { + // FIXME: possible race here--two identical merges could be scheduled + auto vers = epoch->get_structure(); + schedule_merge(); + buffer = add_empty_buffer(epoch); + } + + } while(!buffer->append(rec, ts)); - return buffer->append(rec, ts); + /* internal append should always succeed, eventually */ + return 1; } - static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, BufView *buffers, Structure *vers) { + static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, + BufView &buffers, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -431,7 +432,7 @@ private: continue; } - if (buffers->check_tombstone(rec.rec)) { + if (buffers.check_tombstone(rec.rec)) { continue; } |