22
33import java .util .*;
44import java .util .concurrent .*;
5+ import java .util .stream .Collectors ;
56
67import org .slf4j .Logger ;
78import org .slf4j .LoggerFactory ;
89
910import io .fabric8 .kubernetes .api .model .HasMetadata ;
10- import io .javaoperatorsdk .operator .AggregatedOperatorException ;
1111import io .javaoperatorsdk .operator .api .reconciler .Context ;
1212import io .javaoperatorsdk .operator .api .reconciler .dependent .Deleter ;
1313import io .javaoperatorsdk .operator .api .reconciler .dependent .DependentResource ;
@@ -20,17 +20,14 @@ public class WorkflowReconcileExecutor<P extends HasMetadata> {
2020
2121 private final Workflow <P > workflow ;
2222
23- private final Set <DependentResourceNode <?, ?>> alreadyReconciled = ConcurrentHashMap .newKeySet ();
24- private final Set <DependentResourceNode <?, ?>> errored = ConcurrentHashMap .newKeySet ();
25- private final Set <DependentResourceNode <?, ?>> notReady =
26- ConcurrentHashMap .newKeySet ();
27- private final Set <DependentResourceNode <?, ?>> reconcileConditionOrParentsConditionNotMet =
28- ConcurrentHashMap .newKeySet ();
29-
23+ private final Set <DependentResourceNode <?, ?>> alreadyReconciled = new HashSet <>();
24+ private final Set <DependentResourceNode <?, ?>> notReady = new HashSet <>();
25+ private final Set <DependentResourceNode <?, ?>> ownOrAncestorReconcileConditionConditionNotMet =
26+ new HashSet <>();
3027 private final Map <DependentResourceNode <?, ?>, Future <?>> actualExecutions =
31- new ConcurrentHashMap <>();
32- private final List < Exception > exceptionsDuringExecution =
33- Collections . synchronizedList ( new ArrayList <>() );
28+ new HashMap <>();
29+ private final Map < DependentResourceNode <?, ?>, Exception > exceptionsDuringExecution =
30+ new HashMap <>();
3431
3532 private final P primary ;
3633 private final Context <P > context ;
@@ -42,18 +39,14 @@ public WorkflowReconcileExecutor(Workflow<P> workflow, P primary, Context<P> con
4239 }
4340
4441 // add reconcile results
45- public synchronized void reconcile () {
42+ public synchronized WorkflowExecutionResult reconcile () {
4643 for (DependentResourceNode <?, P > dependentResourceNode : workflow
4744 .getTopLevelDependentResources ()) {
4845 handleReconcile (dependentResourceNode , false );
4946 }
5047 while (true ) {
5148 try {
5249 this .wait ();
53- if (!exceptionsDuringExecution .isEmpty ()) {
54- log .debug ("Exception during reconciliation for: {}" , primary );
55- throw createFinalException ();
56- }
5750 if (noMoreExecutionsScheduled ()) {
5851 break ;
5952 } else {
@@ -64,6 +57,7 @@ public synchronized void reconcile() {
6457 Thread .currentThread ().interrupt ();
6558 }
6659 }
60+ return createReconcileResult ();
6761 }
6862
6963 private synchronized void handleReconcile (
@@ -80,7 +74,7 @@ private synchronized void handleReconcile(
8074 }
8175
8276 if (onlyReconcileForPossibleDelete ) {
83- reconcileConditionOrParentsConditionNotMet .add (dependentResourceNode );
77+ ownOrAncestorReconcileConditionConditionNotMet .add (dependentResourceNode );
8478 } else {
8579 dependentResourceNode .getReconcileCondition ()
8680 .ifPresent (reconcileCondition -> handleReconcileCondition (dependentResourceNode ,
@@ -98,8 +92,7 @@ private synchronized void handleReconcile(
9892
9993 private synchronized void handleExceptionInExecutor (DependentResourceNode dependentResourceNode ,
10094 RuntimeException e ) {
101- exceptionsDuringExecution .add (e );
102- errored .add (dependentResourceNode );
95+ exceptionsDuringExecution .put (dependentResourceNode , e );
10396 }
10497
10598 private synchronized void handleNodeExecutionFinish (DependentResourceNode dependentResourceNode ) {
@@ -125,9 +118,9 @@ private synchronized void setAlreadyReconciledButNotReady(
125118
126119 private boolean ownOrParentsReconcileConditionNotMet (
127120 DependentResourceNode <?, ?> dependentResourceNode ) {
128- return reconcileConditionOrParentsConditionNotMet .contains (dependentResourceNode ) ||
121+ return ownOrAncestorReconcileConditionConditionNotMet .contains (dependentResourceNode ) ||
129122 dependentResourceNode .getDependsOn ().stream ()
130- .anyMatch (reconcileConditionOrParentsConditionNotMet ::contains );
123+ .anyMatch (ownOrAncestorReconcileConditionConditionNotMet ::contains );
131124 }
132125
133126 private class NodeExecutor implements Runnable {
@@ -196,10 +189,6 @@ private boolean noMoreExecutionsScheduled() {
196189 return actualExecutions .isEmpty ();
197190 }
198191
199- private AggregatedOperatorException createFinalException () {
200- return new AggregatedOperatorException ("Exception during workflow." , exceptionsDuringExecution );
201- }
202-
203192 private boolean alreadyReconciled (
204193 DependentResourceNode <?, ?> dependentResourceNode ) {
205194 return alreadyReconciled .contains (dependentResourceNode );
@@ -211,7 +200,7 @@ private void handleReconcileCondition(DependentResourceNode<?, ?> dependentResou
211200 boolean conditionMet =
212201 reconcileCondition .isMet (dependentResourceNode .getDependentResource (), primary , context );
213202 if (!conditionMet ) {
214- reconcileConditionOrParentsConditionNotMet .add (dependentResourceNode );
203+ ownOrAncestorReconcileConditionConditionNotMet .add (dependentResourceNode );
215204 }
216205 }
217206
@@ -226,6 +215,29 @@ private boolean hasErroredParent(
226215 DependentResourceNode <?, ?> dependentResourceNode ) {
227216 return !dependentResourceNode .getDependsOn ().isEmpty ()
228217 && dependentResourceNode .getDependsOn ().stream ()
229- .anyMatch (errored :: contains );
218+ .anyMatch (exceptionsDuringExecution :: containsKey );
230219 }
220+
221+ private WorkflowExecutionResult createReconcileResult () {
222+ WorkflowExecutionResult workflowExecutionResult = new WorkflowExecutionResult ();
223+
224+ workflowExecutionResult .setErroredDependents (exceptionsDuringExecution
225+ .entrySet ().stream ()
226+ .collect (Collectors .toMap (e -> e .getKey ().getDependentResource (), Map .Entry ::getValue )));
227+ workflowExecutionResult .setNotReadyDependents (notReady .stream ()
228+ .map (DependentResourceNode ::getDependentResource )
229+ .collect (Collectors .toList ()));
230+
231+ workflowExecutionResult .setReconciledDependents (alreadyReconciled .stream ()
232+ .map (DependentResourceNode ::getDependentResource ).collect (Collectors .toList ()));
233+
234+ var notReconciledDependentResources =
235+ new HashSet <DependentResourceNode <?, ?>>(workflow .getDependents ().keySet ());
236+ notReconciledDependentResources .removeAll (alreadyReconciled );
237+ workflowExecutionResult .setNotReconciledDependents (notReconciledDependentResources .stream ()
238+ .map (DependentResourceNode ::getDependentResource ).collect (Collectors .toList ()));
239+
240+ return workflowExecutionResult ;
241+ }
242+
231243}
0 commit comments