summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-04-19 17:38:16 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2024-04-19 17:38:16 -0400
commit438feac7e56fee425d9c6f1a43298ff9dc5b71d1 (patch)
tree986ea9a630494be1af6bdf8ccb7639b6f3934576 /include/framework
parent8479f3ce863dfb6d3b20ff4678fa6fe92ee86b52 (diff)
downloaddynamic-extension-438feac7e56fee425d9c6f1a43298ff9dc5b71d1.tar.gz
Properly implemented support for iteratively decomposable problems
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h40
-rw-r--r--include/framework/interface/Query.h6
2 files changed, 26 insertions, 20 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index b154be8..6fd95c6 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -546,30 +546,34 @@ private:
std::vector<std::pair<ShardID, Shard*>> shards;
std::vector<void *> states = vers->get_query_states(shards, parms);
+ std::vector<R> results;
Q::process_query_states(parms, states, buffer_state);
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
- for (size_t i=0; i<query_results.size(); i++) {
- std::vector<Wrapped<R>> local_results;
- ShardID shid;
-
- if (i == 0) { /* process the buffer first */
- local_results = Q::buffer_query(buffer_state, parms);
- shid = INVALID_SHID;
- } else {
- local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
- shid = shards[i - 1].first;
- }
+ do {
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
+ for (size_t i=0; i<query_results.size(); i++) {
+ std::vector<Wrapped<R>> local_results;
+ ShardID shid;
+
+ if (i == 0) { /* process the buffer first */
+ local_results = Q::buffer_query(buffer_state, parms);
+ shid = INVALID_SHID;
+ } else {
+ local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
+ shid = shards[i - 1].first;
+ }
- query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
+ query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i].size() > 0) break;
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i].size() > 0) break;
+ }
}
- }
+ Q::merge(query_results, parms, results);
+
+ } while (Q::repeat(parms, results, states, buffer_state));
- auto result = Q::merge(query_results, parms);
- args->result_set.set_value(std::move(result));
+ args->result_set.set_value(std::move(results));
((DynamicExtension *) args->extension)->end_job(epoch);
diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h
index 3d487f0..577d6cd 100644
--- a/include/framework/interface/Query.h
+++ b/include/framework/interface/Query.h
@@ -13,17 +13,19 @@
namespace de{
template <typename Q, typename R, typename S>
-concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv) {
+concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv, std::vector<R> &resv) {
{Q::get_query_state(sh, p)} -> std::convertible_to<void*>;
{Q::get_buffer_query_state(bv, p)} -> std::convertible_to<void *>;
{Q::process_query_states(p, s, p)};
{Q::query(sh, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>;
{Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>;
- {Q::merge(rv, p)} -> std::convertible_to<std::vector<R>>;
+ {Q::merge(rv, p, resv)};
{Q::delete_query_state(p)} -> std::same_as<void>;
{Q::delete_buffer_query_state(p)} -> std::same_as<void>;
+ {Q::repeat(p, resv, s, p)} -> std::same_as<bool>;
+
{Q::EARLY_ABORT} -> std::convertible_to<bool>;
{Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>;
};