1717
1818import  io .serverlessworkflow .api .types .CatchErrors ;
1919import  io .serverlessworkflow .api .types .ErrorFilter ;
20+ import  io .serverlessworkflow .api .types .Retry ;
21+ import  io .serverlessworkflow .api .types .RetryBackoff ;
22+ import  io .serverlessworkflow .api .types .RetryPolicy ;
2023import  io .serverlessworkflow .api .types .TaskItem ;
2124import  io .serverlessworkflow .api .types .TryTask ;
2225import  io .serverlessworkflow .api .types .TryTaskCatch ;
2932import  io .serverlessworkflow .impl .WorkflowMutablePosition ;
3033import  io .serverlessworkflow .impl .WorkflowPredicate ;
3134import  io .serverlessworkflow .impl .WorkflowUtils ;
35+ import  io .serverlessworkflow .impl .executors .retry .ConstantRetryIntervalFunction ;
36+ import  io .serverlessworkflow .impl .executors .retry .DefaultRetryExecutor ;
37+ import  io .serverlessworkflow .impl .executors .retry .ExponentialRetryIntervalFunction ;
38+ import  io .serverlessworkflow .impl .executors .retry .LinearRetryIntervalFunction ;
39+ import  io .serverlessworkflow .impl .executors .retry .RetryExecutor ;
40+ import  io .serverlessworkflow .impl .executors .retry .RetryIntervalFunction ;
3241import  java .util .List ;
3342import  java .util .Optional ;
3443import  java .util .concurrent .CompletableFuture ;
@@ -42,6 +51,7 @@ public class TryExecutor extends RegularTaskExecutor<TryTask> {
4251  private  final  Optional <Predicate <WorkflowError >> errorFilter ;
4352  private  final  TaskExecutor <?> taskExecutor ;
4453  private  final  Optional <TaskExecutor <?>> catchTaskExecutor ;
54+   private  final  Optional <RetryExecutor > retryIntervalExecutor ;
4555
4656  public  static  class  TryExecutorBuilder  extends  RegularTaskExecutorBuilder <TryTask > {
4757
@@ -50,6 +60,7 @@ public static class TryExecutorBuilder extends RegularTaskExecutorBuilder<TryTas
5060    private  final  Optional <Predicate <WorkflowError >> errorFilter ;
5161    private  final  TaskExecutor <?> taskExecutor ;
5262    private  final  Optional <TaskExecutor <?>> catchTaskExecutor ;
63+     private  final  Optional <RetryExecutor > retryIntervalExecutor ;
5364
5465    protected  TryExecutorBuilder (
5566        WorkflowMutablePosition  position , TryTask  task , WorkflowDefinition  definition ) {
@@ -60,13 +71,63 @@ protected TryExecutorBuilder(
6071      this .exceptFilter  = WorkflowUtils .optionalPredicate (application , catchInfo .getExceptWhen ());
6172      this .taskExecutor  =
6273          TaskExecutorHelper .createExecutorList (position , task .getTry (), definition );
63-       List <TaskItem > catchTask  = task .getCatch ().getDo ();
64-       this .catchTaskExecutor  =
65-           catchTask  != null  && !catchTask .isEmpty ()
66-               ? Optional .of (
67-                   TaskExecutorHelper .createExecutorList (
68-                       position , task .getCatch ().getDo (), definition ))
69-               : Optional .empty ();
74+       TryTaskCatch  catchTask  = task .getCatch ();
75+       if  (catchTask  != null ) {
76+         List <TaskItem > catchTaskDo  = catchTask .getDo ();
77+ 
78+         this .catchTaskExecutor  =
79+             catchTaskDo  != null  && !catchTaskDo .isEmpty ()
80+                 ? Optional .of (
81+                     TaskExecutorHelper .createExecutorList (position , catchTaskDo , definition ))
82+                 : Optional .empty ();
83+ 
84+         Retry  retry  = catchTask .getRetry ();
85+         this .retryIntervalExecutor  = retry  != null  ? buildRetryInterval (retry ) : Optional .empty ();
86+       } else  {
87+         this .catchTaskExecutor  = Optional .empty ();
88+         this .retryIntervalExecutor  = Optional .empty ();
89+       }
90+     }
91+ 
92+     private  Optional <RetryExecutor > buildRetryInterval (Retry  retry ) {
93+       RetryPolicy  retryPolicy  = null ;
94+       if  (retry .getRetryPolicyDefinition () != null ) {
95+         retryPolicy  = retry .getRetryPolicyDefinition ();
96+       } else  if  (retry .getRetryPolicyReference () != null ) {
97+         retryPolicy  =
98+             workflow 
99+                 .getUse ()
100+                 .getRetries ()
101+                 .getAdditionalProperties ()
102+                 .get (retry .getRetryPolicyReference ());
103+         if  (retryPolicy  == null ) {
104+           throw  new  IllegalStateException ("Retry policy "  + retryPolicy  + " was not found" );
105+         }
106+       }
107+       return  retryPolicy  != null  ? Optional .of (buildRetryExecutor (retryPolicy )) : Optional .empty ();
108+     }
109+ 
110+     protected  RetryExecutor  buildRetryExecutor (RetryPolicy  retryPolicy ) {
111+       return  new  DefaultRetryExecutor (
112+           retryPolicy .getLimit ().getAttempt ().getCount (),
113+           buildIntervalFunction (retryPolicy ),
114+           WorkflowUtils .optionalPredicate (application , retryPolicy .getWhen ()),
115+           WorkflowUtils .optionalPredicate (application , retryPolicy .getExceptWhen ()));
116+     }
117+ 
118+     private  RetryIntervalFunction  buildIntervalFunction (RetryPolicy  retryPolicy ) {
119+       RetryBackoff  backoff  = retryPolicy .getBackoff ();
120+       if  (backoff .getConstantBackoff () != null ) {
121+         return  new  ConstantRetryIntervalFunction (
122+             application , retryPolicy .getDelay (), retryPolicy .getJitter ());
123+       } else  if  (backoff .getLinearBackoff () != null ) {
124+         return  new  LinearRetryIntervalFunction (
125+             application , retryPolicy .getDelay (), retryPolicy .getJitter ());
126+       } else  if  (backoff .getExponentialBackOff () != null ) {
127+         return  new  ExponentialRetryIntervalFunction (
128+             application , retryPolicy .getDelay (), retryPolicy .getJitter ());
129+       }
130+       throw  new  IllegalStateException ("A backoff strategy should be set" );
70131    }
71132
72133    @ Override 
@@ -82,13 +143,19 @@ protected TryExecutor(TryExecutorBuilder builder) {
82143    this .exceptFilter  = builder .exceptFilter ;
83144    this .taskExecutor  = builder .taskExecutor ;
84145    this .catchTaskExecutor  = builder .catchTaskExecutor ;
146+     this .retryIntervalExecutor  = builder .retryIntervalExecutor ;
85147  }
86148
87149  @ Override 
88150  protected  CompletableFuture <WorkflowModel > internalExecute (
89151      WorkflowContext  workflow , TaskContext  taskContext ) {
152+     return  doIt (workflow , taskContext , taskContext .input ());
153+   }
154+ 
155+   private  CompletableFuture <WorkflowModel > doIt (
156+       WorkflowContext  workflow , TaskContext  taskContext , WorkflowModel  model ) {
90157    return  TaskExecutorHelper .processTaskList (
91-             taskExecutor , workflow , Optional .of (taskContext ), taskContext . input () )
158+             taskExecutor , workflow , Optional .of (taskContext ), model )
92159        .exceptionallyCompose (e  -> handleException (e , workflow , taskContext ));
93160  }
94161
@@ -99,17 +166,27 @@ private CompletableFuture<WorkflowModel> handleException(
99166    }
100167    if  (e  instanceof  WorkflowException ) {
101168      WorkflowException  exception  = (WorkflowException ) e ;
169+       CompletableFuture <WorkflowModel > completable  =
170+           CompletableFuture .completedFuture (taskContext .rawOutput ());
102171      if  (errorFilter .map (f  -> f .test (exception .getWorkflowError ())).orElse (true )
103-           && whenFilter .map (w  -> w .test (workflow , taskContext , taskContext .input ())).orElse (true )
104-           && exceptFilter 
105-               .map (w  -> !w .test (workflow , taskContext , taskContext .input ()))
106-               .orElse (true )) {
172+           && WorkflowUtils .whenExceptTest (
173+               whenFilter , exceptFilter , workflow , taskContext , taskContext .rawOutput ())) {
107174        if  (catchTaskExecutor .isPresent ()) {
108-           return  TaskExecutorHelper .processTaskList (
109-               catchTaskExecutor .get (), workflow , Optional .of (taskContext ), taskContext .input ());
175+           completable  =
176+               completable .thenCompose (
177+                   model  ->
178+                       TaskExecutorHelper .processTaskList (
179+                           catchTaskExecutor .get (), workflow , Optional .of (taskContext ), model ));
180+         }
181+         if  (retryIntervalExecutor .isPresent ()) {
182+           completable  =
183+               completable 
184+                   .thenCompose (
185+                       model  -> retryIntervalExecutor .get ().retry (workflow , taskContext , model ))
186+                   .thenCompose (model  -> doIt (workflow , taskContext , model ));
110187        }
111188      }
112-       return  CompletableFuture . completedFuture ( taskContext . rawOutput ()) ;
189+       return  completable ;
113190    } else  {
114191      if  (e  instanceof  RuntimeException ) {
115192        throw  (RuntimeException ) e ;
0 commit comments