diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-04-19 17:38:16 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-04-19 17:38:16 -0400 |
| commit | 438feac7e56fee425d9c6f1a43298ff9dc5b71d1 (patch) | |
| tree | 986ea9a630494be1af6bdf8ccb7639b6f3934576 /include/framework | |
| parent | 8479f3ce863dfb6d3b20ff4678fa6fe92ee86b52 (diff) | |
| download | dynamic-extension-438feac7e56fee425d9c6f1a43298ff9dc5b71d1.tar.gz | |
Properly implemented support for iteratively decomposable problems
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 40 | ||||
| -rw-r--r-- | include/framework/interface/Query.h | 6 |
2 files changed, 26 insertions, 20 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index b154be8..6fd95c6 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -546,30 +546,34 @@ private: std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void *> states = vers->get_query_states(shards, parms); + std::vector<R> results; Q::process_query_states(parms, states, buffer_state); - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); - for (size_t i=0; i<query_results.size(); i++) { - std::vector<Wrapped<R>> local_results; - ShardID shid; - - if (i == 0) { /* process the buffer first */ - local_results = Q::buffer_query(buffer_state, parms); - shid = INVALID_SHID; - } else { - local_results = Q::query(shards[i - 1].second, states[i - 1], parms); - shid = shards[i - 1].first; - } + do { + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); + for (size_t i=0; i<query_results.size(); i++) { + std::vector<Wrapped<R>> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } - query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); + query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) break; + if constexpr (Q::EARLY_ABORT) { + if (query_results[i].size() > 0) break; + } } - } + Q::merge(query_results, parms, results); + + } while (Q::repeat(parms, results, states, buffer_state)); - auto result = Q::merge(query_results, parms); - args->result_set.set_value(std::move(result)); + args->result_set.set_value(std::move(results)); ((DynamicExtension *) args->extension)->end_job(epoch); diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 3d487f0..577d6cd 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -13,17 +13,19 @@ namespace de{ template <typename Q, typename R, typename S> -concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv) { +concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv, std::vector<R> &resv) { {Q::get_query_state(sh, p)} -> std::convertible_to<void*>; {Q::get_buffer_query_state(bv, p)} -> std::convertible_to<void *>; {Q::process_query_states(p, s, p)}; {Q::query(sh, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; {Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; - {Q::merge(rv, p)} -> std::convertible_to<std::vector<R>>; + {Q::merge(rv, p, resv)}; {Q::delete_query_state(p)} -> std::same_as<void>; {Q::delete_buffer_query_state(p)} -> std::same_as<void>; + {Q::repeat(p, resv, s, p)} -> std::same_as<bool>; + {Q::EARLY_ABORT} -> std::convertible_to<bool>; {Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>; }; |