From 39ae3e0441d8297a09197aba98bd494b5ada12c1 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 14:17:59 -0400 Subject: Concurrency updates + fixes for compile errors --- include/framework/DynamicExtension.h | 161 ++++++++++++++++++----------------- 1 file changed, 81 insertions(+), 80 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6936247..d2a6b7a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -39,7 +39,7 @@ class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; typedef ExtensionStructure Structure; - typedef Epoch Epoch; + typedef Epoch _Epoch; typedef BufferView BufView; public: @@ -53,20 +53,24 @@ public: { auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new Epoch(vers, buf); + auto epoch = new _Epoch(vers, buf); - m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); - m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop)); - m_epochs.push_back({0, epoch}); + m_buffers.insert(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); + m_versions.insert(new Structure(buffer_cap, scale_factor, max_delete_prop)); + m_epochs.insert({0, epoch}); } ~DynamicExtension() { - for (size_t i=0; idelete_record(rec); + return buffers.delete_record(rec); } /* @@ -97,7 +101,7 @@ public: } std::future> query(void *parms) { - return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms); + return schedule_query(parms); } size_t get_record_count() { @@ -183,36 +187,37 @@ private: std::set m_versions; std::atomic m_current_epoch; - std::unordered_map m_epochs; + std::unordered_map m_epochs; size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; size_t m_buffer_delete_capacity; - Epoch *get_active_epoch() { + _Epoch *get_active_epoch() { return m_epochs[m_current_epoch.load()]; } void advance_epoch() { size_t new_epoch_num = m_current_epoch.load() + 1; - Epoch *new_epoch = m_epochs[new_epoch_num]; - Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + _Epoch *new_epoch = m_epochs[new_epoch_num]; + _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; // Update the new Epoch to contain the buffers // from the old one that it doesn't currently have size_t old_buffer_cnt = new_epoch->clear_buffers(); for (size_t i=old_buffer_cnt; iget_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers[i]); + new_epoch->add_buffer(old_epoch->get_buffers()[i]); } m_current_epoch.fetch_add(1); + retire_epoch(old_epoch); } /* * Creates a new epoch by copying the currently active one. The new epoch's * structure will be a shallow copy of the old one's. */ - Epoch *create_new_epoch() { + _Epoch *create_new_epoch() { auto new_epoch = get_active_epoch()->clone(); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); @@ -228,8 +233,8 @@ private: * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(Epoch *epoch) { - auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity); + Buffer *add_empty_buffer(_Epoch *epoch) { + auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock m_struct_lock; m_buffers.insert(new_buffer); @@ -239,7 +244,7 @@ private: return new_buffer; } - void retire_epoch(Epoch *epoch) { + void retire_epoch(_Epoch *epoch) { /* * Epochs with currently active jobs cannot * be retired. By the time retire_epoch is called, @@ -294,26 +299,15 @@ private: vers->merge_buffer(buff); - args->result.set_value(true); args->epoch->end_job(); - delete args; - } - - static std::vector finalize_query_result(std::vector>> &query_results, void *parms, - std::vector &shard_states, std::vector &buffer_states) { - auto result = Q::merge(query_results, parms); - - for (size_t i=0; iresult.set_value(true); - return result; + ((DynamicExtension *) args->extension)->advance_epoch(); + + // FIXME: this might break things... not sure + delete args; } - + static void async_query(void *arguments) { QueryArgs *args = (QueryArgs *) arguments; @@ -322,58 +316,56 @@ private: void *parms = args->query_parms; // Get the buffer query states - std::vector buffer_states = buffers->get_buffer_query_states(parms); + std::vector buffer_states = buffers.get_query_states(parms); // Get the shard query states std::vector> shards; - std::vector shard_states = vers->get_query_states(shards, parms); + std::vector states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, shard_states, buffer_states); + Q::process_query_states(parms, states, buffer_states); std::vector>> query_results(shards.size() + buffer_states.size()); + for (size_t i=0; i> local_results = (i < buffer_states.size()) + ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms) + : Q::query(shards[i - buffer_states.size()].second, + states[i - buffer_states.size()], parms); + ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first; + query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers)); - // Execute the query for the buffer - std::vector>> buffer_results(buffer_states.size()); - for (size_t i=0; iget_buffers[i], buffer_states[i], parms); - query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers)); - - if constexpr (Q::EARLY_ABORT) { - if (query_results[i] > 0) { - return finalize_query_result(query_results, parms, buffer_states, shard_states); - } - } - } - - // Execute the query for each shard - for (size_t i=0; i 0) { - return finalize_query_result(query_results, parms, buffer_states, shard_states); - } + if (query_results[i].size() > 0) break; } } - - // Merge the results together and finalize the job - auto result = finalize_query_result(query_results, parms, buffer_states, shard_states); + + auto result = Q::merge(query_results, parms); args->result_set.set_value(std::move(result)); args->epoch->end_job(); + + for (size_t i=0; i schedule_merge() { - auto epoch = get_active_epoch(); + void schedule_merge() { + auto epoch = create_new_epoch(); epoch->start_job(); MergeArgs *args = new MergeArgs(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]); + // FIXME: all full buffers can be merged at this point--but that requires + // retooling the shard interface a bit to do efficiently. + args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); + args->extension = this; m_sched.schedule_job(merge, 0, args); - - return args->result.get_future(); } std::future> schedule_query(void *query_parms) { @@ -389,20 +381,29 @@ private: } int internal_append(const R &rec, bool ts) { - Buffer *buffer; - while (!(buffer = get_active_epoch()->get_active_buffer())) - ; - - if (buffer->is_full()) { - auto vers = get_active_epoch()->get_structure(); - auto res = schedule_merge(vers, buffer); - res.get(); - } + Buffer *buffer = nullptr; + do { + auto epoch = get_active_epoch(); + + while (!(buffer = epoch->get_active_buffer())) + ; + + /* if the buffer is full, schedule a merge and add a new empty buffer */ + if (buffer->is_full()) { + // FIXME: possible race here--two identical merges could be scheduled + auto vers = epoch->get_structure(); + schedule_merge(); + buffer = add_empty_buffer(epoch); + } + + } while(!buffer->append(rec, ts)); - return buffer->append(rec, ts); + /* internal append should always succeed, eventually */ + return 1; } - static std::vector> filter_deletes(std::vector> &records, ShardID shid, BufView *buffers, Structure *vers) { + static std::vector> filter_deletes(std::vector> &records, ShardID shid, + BufView &buffers, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -431,7 +432,7 @@ private: continue; } - if (buffers->check_tombstone(rec.rec)) { + if (buffers.check_tombstone(rec.rec)) { continue; } -- cgit v1.2.3