11package io .javaoperatorsdk .operator .processing .dependent .workflow ;
22
3+ import java .util .HashMap ;
4+ import java .util .HashSet ;
5+ import java .util .Map ;
6+ import java .util .Set ;
7+ import java .util .concurrent .Future ;
8+
39import org .slf4j .Logger ;
410import org .slf4j .LoggerFactory ;
511
@@ -10,6 +16,15 @@ public class WorkflowCleanupExecutor<P extends HasMetadata> {
1016
1117 private static final Logger log = LoggerFactory .getLogger (WorkflowReconcileExecutor .class );
1218
19+ private final Map <DependentResourceNode <?, ?>, Future <?>> actualExecutions =
20+ new HashMap <>();
21+ private final Map <DependentResourceNode <?, ?>, Exception > exceptionsDuringExecution =
22+ new HashMap <>();
23+ private final Set <DependentResourceNode <?, ?>> alreadyReconciled = new HashSet <>();
24+ private final Set <DependentResourceNode <?, ?>> notReady = new HashSet <>();
25+ private final Set <DependentResourceNode <?, ?>> ownOrAncestorReconcileConditionConditionNotMet =
26+ new HashSet <>();
27+
1328 private final Workflow <P > workflow ;
1429 private final P primary ;
1530 private final Context <P > context ;
@@ -23,13 +38,104 @@ public WorkflowCleanupExecutor(Workflow<P> workflow, P primary, Context<P> conte
2338
2439 public synchronized WorkflowCleanupResult cleanup () {
2540 for (DependentResourceNode <?, P > dependentResourceNode : workflow
26- .getTopLevelDependentResources ()) {
41+ .getBottomLevelResource ()) {
2742 handleCleanup (dependentResourceNode , false );
2843 }
29- return null ;
44+ while (true ) {
45+ try {
46+ this .wait ();
47+ if (noMoreExecutionsScheduled ()) {
48+ break ;
49+ } else {
50+ log .warn ("Notified but still resources under execution. This should not happen." );
51+ }
52+ } catch (InterruptedException e ) {
53+ log .warn ("Thread interrupted" , e );
54+ Thread .currentThread ().interrupt ();
55+ }
56+ }
57+ return createCleanupResult ();
58+ }
59+
60+ private WorkflowCleanupResult createCleanupResult () {
61+ return new WorkflowCleanupResult ();
62+ }
63+
64+ private synchronized boolean noMoreExecutionsScheduled () {
65+ return actualExecutions .isEmpty ();
3066 }
3167
3268 private void handleCleanup (DependentResourceNode <?, P > dependentResourceNode , boolean b ) {
69+ log .debug ("Submitting for cleanup: {}" , dependentResourceNode );
70+
71+ if (alreadyVisited (dependentResourceNode )
72+ || isCleaningNow (dependentResourceNode )
73+ || !allParentsCleaned (dependentResourceNode )
74+ || hasErroredParent (dependentResourceNode )) {
75+ log .debug ("Skipping submit of: {}, " , dependentResourceNode );
76+ return ;
77+ }
78+
79+ }
80+
81+ private class NodeExecutor implements Runnable {
82+
83+ private final DependentResourceNode <?, P > dependentResourceNode ;
84+ private final boolean onlyReconcileForPossibleDelete ;
85+
86+ private NodeExecutor (DependentResourceNode <?, P > dependentResourceNode ,
87+ boolean onlyReconcileForDelete ) {
88+ this .dependentResourceNode = dependentResourceNode ;
89+ this .onlyReconcileForPossibleDelete = onlyReconcileForDelete ;
90+ }
91+
92+ @ Override
93+ @ SuppressWarnings ("unchecked" )
94+ public void run () {
95+ try {
96+
97+ } catch (RuntimeException e ) {
98+ handleExceptionInExecutor (dependentResourceNode , e );
99+ } finally {
100+ handleNodeExecutionFinish (dependentResourceNode );
101+ }
102+ }
103+ }
104+
105+ private synchronized void handleExceptionInExecutor (DependentResourceNode dependentResourceNode ,
106+ RuntimeException e ) {
107+ exceptionsDuringExecution .put (dependentResourceNode , e );
108+ }
109+
110+ private synchronized void handleNodeExecutionFinish (DependentResourceNode dependentResourceNode ) {
111+ log .debug ("Finished execution for: {}" , dependentResourceNode );
112+ actualExecutions .remove (dependentResourceNode );
113+ if (actualExecutions .isEmpty ()) {
114+ this .notifyAll ();
115+ }
116+ }
117+
118+ private boolean isCleaningNow (DependentResourceNode <?, ?> dependentResourceNode ) {
119+ return actualExecutions .containsKey (dependentResourceNode );
120+ }
121+
122+
123+ private boolean alreadyVisited (
124+ DependentResourceNode <?, ?> dependentResourceNode ) {
125+ return alreadyReconciled .contains (dependentResourceNode );
126+ }
127+
128+ private boolean allParentsCleaned (
129+ DependentResourceNode <?, ?> dependentResourceNode ) {
130+ return dependentResourceNode .getDependsOn ().isEmpty ()
131+ || dependentResourceNode .getDependsOn ().stream ()
132+ .allMatch (d -> alreadyVisited (d ) && !notReady .contains (d ));
133+ }
33134
135+ private boolean hasErroredParent (
136+ DependentResourceNode <?, ?> dependentResourceNode ) {
137+ return !dependentResourceNode .getDependsOn ().isEmpty ()
138+ && dependentResourceNode .getDependsOn ().stream ()
139+ .anyMatch (exceptionsDuringExecution ::containsKey );
34140 }
35141}
0 commit comments