1515 */
1616package io .serverlessworkflow .impl .executors .openapi ;
1717
18- import static io .serverlessworkflow .impl .executors .http .HttpExecutor .getTargetSupplier ;
19-
2018import io .serverlessworkflow .api .types .CallHTTP ;
2119import io .serverlessworkflow .api .types .CallOpenAPI ;
20+ import io .serverlessworkflow .api .types .OpenAPIArguments ;
2221import io .serverlessworkflow .api .types .TaskBase ;
23- import io .serverlessworkflow .api .types .Workflow ;
2422import io .serverlessworkflow .impl .TaskContext ;
25- import io .serverlessworkflow .impl .WorkflowApplication ;
2623import io .serverlessworkflow .impl .WorkflowContext ;
2724import io .serverlessworkflow .impl .WorkflowDefinition ;
28- import io .serverlessworkflow .impl .WorkflowException ;
2925import io .serverlessworkflow .impl .WorkflowModel ;
3026import io .serverlessworkflow .impl .executors .CallableTask ;
3127import io .serverlessworkflow .impl .executors .http .HttpExecutor ;
32- import io .serverlessworkflow .impl .executors .http .TargetSupplier ;
33- import io .serverlessworkflow .impl .resources .ResourceLoader ;
28+ import java .util .Iterator ;
3429import java .util .concurrent .CompletableFuture ;
3530import java .util .stream .Collectors ;
3631
3732public class OpenAPIExecutor implements CallableTask <CallOpenAPI > {
3833
39- private CallOpenAPI task ;
40- private Workflow workflow ;
41- private WorkflowDefinition definition ;
42- private WorkflowApplication application ;
43- private TargetSupplier targetSupplier ;
44- private ResourceLoader resourceLoader ;
4534 private OperationDefinitionSupplier operationDefinitionSupplier ;
35+ private OpenAPIArguments with ;
4636
4737 @ Override
4838 public boolean accept (Class <? extends TaskBase > clazz ) {
@@ -55,43 +45,39 @@ public CompletableFuture<WorkflowModel> apply(
5545
5646 OperationDefinition operation =
5747 operationDefinitionSupplier .get (workflowContext , taskContext , input );
48+ HttpCallAdapter httpCallAdapter =
49+ getHttpCallAdapter (operation , workflowContext , taskContext , input );
5850
59- return CompletableFuture .supplyAsync (
60- () -> {
61- HttpCallAdapter httpCallAdapter =
62- getHttpCallAdapter (operation , workflowContext , taskContext , input );
63-
64- WorkflowException workflowException = null ;
65-
66- for (var server : operation .getServers ()) {
67- CallHTTP callHTTP = httpCallAdapter .server (server ).build ();
68- HttpExecutor executor = new HttpExecutor ();
69- executor .init (callHTTP , definition );
51+ Iterator <String > iter = operation .getServers ().iterator ();
52+ if (!iter .hasNext ()) {
53+ throw new IllegalArgumentException (
54+ "List of servers is empty for operation " + operation .getOperation ());
55+ }
56+ CompletableFuture <WorkflowModel > future =
57+ executeServer (iter .next (), httpCallAdapter , workflowContext , taskContext , input );
58+ while (iter .hasNext ()) {
59+ future .exceptionallyCompose (
60+ i -> executeServer (iter .next (), httpCallAdapter , workflowContext , taskContext , input ));
61+ }
62+ return future ;
63+ }
7064
71- try {
72- return executor .apply (workflowContext , taskContext , input ).get ();
73- } catch (WorkflowException e ) {
74- workflowException = e ;
75- } catch (Exception e ) {
76- throw new RuntimeException (e );
77- }
78- }
79- throw workflowException ; // if we there, we failed all servers and ex is not null
80- },
81- workflowContext .definition ().application ().executorService ());
65+ private CompletableFuture <WorkflowModel > executeServer (
66+ String server ,
67+ HttpCallAdapter callAdapter ,
68+ WorkflowContext workflowContext ,
69+ TaskContext taskContext ,
70+ WorkflowModel input ) {
71+ CallHTTP callHTTP = callAdapter .server (server ).build ();
72+ HttpExecutor executor = new HttpExecutor ();
73+ executor .init (callHTTP , workflowContext .definition ());
74+ return executor .apply (workflowContext , taskContext , input );
8275 }
8376
8477 @ Override
8578 public void init (CallOpenAPI task , WorkflowDefinition definition ) {
86- this .task = task ;
87- this .definition = definition ;
88- this .workflow = definition .workflow ();
89- this .application = definition .application ();
90- this .resourceLoader = definition .resourceLoader ();
91- this .operationDefinitionSupplier = new OperationDefinitionSupplier (application , task );
92- this .targetSupplier =
93- getTargetSupplier (
94- task .getWith ().getDocument ().getEndpoint (), application .expressionFactory ());
79+ with = task .getWith ();
80+ operationDefinitionSupplier = new OperationDefinitionSupplier (definition .application (), with );
9581 }
9682
9783 private HttpCallAdapter getHttpCallAdapter (
@@ -102,11 +88,11 @@ private HttpCallAdapter getHttpCallAdapter(
10288 OperationPathResolver pathResolver =
10389 new OperationPathResolver (
10490 operation .getPath (),
105- application ,
106- task . getWith () .getParameters ().getAdditionalProperties ());
91+ workflowContext . definition (). application () ,
92+ with .getParameters ().getAdditionalProperties ());
10793
10894 return new HttpCallAdapter ()
109- .auth (task . getWith () .getAuthentication ())
95+ .auth (with .getAuthentication ())
11096 .body (operation .getBody ())
11197 .contentType (operation .getContentType ())
11298 .headers (
@@ -118,8 +104,8 @@ private HttpCallAdapter getHttpCallAdapter(
118104 operation .getParameters ().stream ()
119105 .filter (p -> "query" .equals (p .getIn ()))
120106 .collect (Collectors .toUnmodifiableSet ()))
121- .redirect (task . getWith () .isRedirect ())
107+ .redirect (with .isRedirect ())
122108 .target (pathResolver .resolve (workflowContext , taskContext , input ))
123- .workflowParams (task . getWith () .getParameters ().getAdditionalProperties ());
109+ .workflowParams (with .getParameters ().getAdditionalProperties ());
124110 }
125111}
0 commit comments