1111import io .javaoperatorsdk .operator .api .reconciler .Context ;
1212import io .javaoperatorsdk .operator .api .reconciler .dependent .Deleter ;
1313import io .javaoperatorsdk .operator .api .reconciler .dependent .DependentResource ;
14+ import io .javaoperatorsdk .operator .api .reconciler .dependent .GarbageCollected ;
1415
1516public class WorkflowReconcileExecutor <P extends HasMetadata > {
1617
1718 private static final Logger log = LoggerFactory .getLogger (WorkflowReconcileExecutor .class );
1819
1920 private final Workflow <P > workflow ;
2021
22+ /** Covers both deleted and reconciled */
2123 private final Set <DependentResourceNode <?, ?>> alreadyReconciled = new HashSet <>();
2224 private final Set <DependentResourceNode <?, ?>> notReady = new HashSet <>();
23- private final Set <DependentResourceNode <?, ?>> ownOrAncestorReconcileConditionConditionNotMet =
24- new HashSet <>();
2525 private final Map <DependentResourceNode <?, ?>, Future <?>> actualExecutions =
2626 new HashMap <>();
2727 private final Map <DependentResourceNode <?, ?>, Exception > exceptionsDuringExecution =
2828 new HashMap <>();
2929
30+ private final Set <DependentResourceNode <?, ?>> markedForDelete = new HashSet <>();
31+ private final Set <DependentResourceNode <?, ?>> deleteConditionNotMet = new HashSet <>();
32+
3033 private final P primary ;
3134 private final Context <P > context ;
3235
@@ -43,7 +46,7 @@ public WorkflowReconcileExecutor(Workflow<P> workflow, P primary, Context<P> con
4346 public synchronized WorkflowExecutionResult reconcile () {
4447 for (DependentResourceNode <?, P > dependentResourceNode : workflow
4548 .getTopLevelDependentResources ()) {
46- handleReconcile (dependentResourceNode , false );
49+ handleReconcile (dependentResourceNode );
4750 }
4851 while (true ) {
4952 try {
@@ -62,32 +65,58 @@ public synchronized WorkflowExecutionResult reconcile() {
6265 }
6366
6467 private synchronized void handleReconcile (
65- DependentResourceNode <?, P > dependentResourceNode ,
66- boolean onlyReconcileForPossibleDelete ) {
68+ DependentResourceNode <?, P > dependentResourceNode ) {
6769 log .debug ("Submitting for reconcile: {}" , dependentResourceNode );
6870
6971 if (alreadyReconciled (dependentResourceNode )
7072 || isReconcilingNow (dependentResourceNode )
7173 || !allParentsReconciledAndReady (dependentResourceNode )
74+ || markedForDelete .contains (dependentResourceNode )
7275 || hasErroredParent (dependentResourceNode )) {
7376 log .debug ("Skipping submit of: {}, " , dependentResourceNode );
7477 return ;
7578 }
7679
77- if (onlyReconcileForPossibleDelete ) {
78- ownOrAncestorReconcileConditionConditionNotMet .add (dependentResourceNode );
80+ boolean reconcileConditionMet = dependentResourceNode .getReconcileCondition ().map (
81+ rc -> rc .isMet (dependentResourceNode .getDependentResource (), primary , context ))
82+ .orElse (true );
83+
84+ if (!reconcileConditionMet ) {
85+ handleReconcileConditionNotMet (dependentResourceNode );
7986 } else {
80- dependentResourceNode .getReconcileCondition ()
81- .ifPresent (reconcileCondition -> handleReconcileCondition (dependentResourceNode ,
82- reconcileCondition ));
87+ Future <?> nodeFuture =
88+ workflow
89+ .getExecutorService ()
90+ .submit (
91+ new NodeReconcileExecutor (
92+ dependentResourceNode ));
93+ actualExecutions .put (dependentResourceNode , nodeFuture );
94+ log .debug ("Submitted to reconcile: {}" , dependentResourceNode );
95+ }
96+ }
97+
98+ private void handleDelete (DependentResourceNode dependentResourceNode ) {
99+ log .debug ("Submitting for delete: {}" , dependentResourceNode );
100+
101+ if (alreadyReconciled (dependentResourceNode )
102+ || isReconcilingNow (dependentResourceNode )
103+ || !markedForDelete .contains (dependentResourceNode )
104+ || !allDependentsDeletedAlready (dependentResourceNode )) {
105+ log .debug ("Skipping submit for delete of: {}, " , dependentResourceNode );
106+ return ;
83107 }
84108
85109 Future <?> nodeFuture =
86- workflow .getExecutorService ().submit (
87- new NodeExecutor (dependentResourceNode ,
88- ownOrParentsReconcileConditionNotMet (dependentResourceNode )));
110+ workflow .getExecutorService ()
111+ .submit (new NodeDeleteExecutor (dependentResourceNode ));
89112 actualExecutions .put (dependentResourceNode , nodeFuture );
90- log .debug ("Submitted to reconcile: {}" , dependentResourceNode );
113+ log .debug ("Submitted to delete: {}" , dependentResourceNode );
114+ }
115+
116+ private boolean allDependentsDeletedAlready (DependentResourceNode dependentResourceNode ) {
117+ var dependents = workflow .getDependents (dependentResourceNode );
118+ return dependents .stream ().allMatch (d -> alreadyReconciled .contains (d ) && !notReady .contains (d )
119+ && !exceptionsDuringExecution .containsKey (d ));
91120 }
92121
93122
@@ -112,22 +141,12 @@ private synchronized void setAlreadyReconciledButNotReady(
112141 notReady .add (dependentResourceNode );
113142 }
114143
115- private boolean ownOrParentsReconcileConditionNotMet (
116- DependentResourceNode <?, ?> dependentResourceNode ) {
117- return ownOrAncestorReconcileConditionConditionNotMet .contains (dependentResourceNode ) ||
118- dependentResourceNode .getDependsOn ().stream ()
119- .anyMatch (ownOrAncestorReconcileConditionConditionNotMet ::contains );
120- }
121-
122- private class NodeExecutor implements Runnable {
144+ private class NodeReconcileExecutor implements Runnable {
123145
124146 private final DependentResourceNode <?, P > dependentResourceNode ;
125- private final boolean onlyReconcileForPossibleDelete ;
126147
127- private NodeExecutor (DependentResourceNode <?, P > dependentResourceNode ,
128- boolean onlyReconcileForDelete ) {
148+ private NodeReconcileExecutor (DependentResourceNode <?, P > dependentResourceNode ) {
129149 this .dependentResourceNode = dependentResourceNode ;
130- this .onlyReconcileForPossibleDelete = onlyReconcileForDelete ;
131150 }
132151
133152 @ Override
@@ -136,23 +155,17 @@ public void run() {
136155 try {
137156 DependentResource dependentResource = dependentResourceNode .getDependentResource ();
138157 boolean ready = true ;
139- if (onlyReconcileForPossibleDelete ) {
140- if (dependentResource instanceof Deleter ) {
141- ((Deleter <P >) dependentResource ).delete (primary , context );
142- }
143- } else {
144- dependentResource .reconcile (primary , context );
145- if (dependentResourceNode .getReadyCondition ().isPresent ()
146- && !dependentResourceNode .getReadyCondition ().get ()
147- .isMet (dependentResource , primary , context )) {
148- ready = false ;
149- }
150- }
151158
159+ dependentResource .reconcile (primary , context );
160+ if (dependentResourceNode .getReadyCondition ().isPresent ()
161+ && !dependentResourceNode .getReadyCondition ().get ()
162+ .isMet (dependentResource , primary , context )) {
163+ ready = false ;
164+ }
152165 if (ready ) {
153166 log .debug ("Setting already reconciled for: {}" , dependentResourceNode );
154167 alreadyReconciled .add (dependentResourceNode );
155- handleDependentsReconcile (dependentResourceNode , onlyReconcileForPossibleDelete );
168+ handleDependentsReconcile (dependentResourceNode );
156169 } else {
157170 setAlreadyReconciledButNotReady (dependentResourceNode );
158171 }
@@ -164,16 +177,59 @@ public void run() {
164177 }
165178 }
166179
180+ private class NodeDeleteExecutor implements Runnable {
181+
182+ private final DependentResourceNode <?, P > dependentResourceNode ;
183+
184+ private NodeDeleteExecutor (DependentResourceNode <?, P > dependentResourceNode ) {
185+ this .dependentResourceNode = dependentResourceNode ;
186+ }
187+
188+ @ Override
189+ @ SuppressWarnings ("unchecked" )
190+ public void run () {
191+ try {
192+ DependentResource dependentResource = dependentResourceNode .getDependentResource ();
193+ var deletePostCondition = dependentResourceNode .getDeletePostCondition ();
194+
195+ if (dependentResource instanceof Deleter
196+ && !(dependentResource instanceof GarbageCollected )) {
197+ ((Deleter <P >) dependentResourceNode .getDependentResource ()).delete (primary , context );
198+ }
199+ alreadyReconciled .add (dependentResourceNode );
200+ boolean deletePostConditionMet =
201+ deletePostCondition .map (c -> c .isMet (dependentResource , primary , context )).orElse (true );
202+ if (deletePostConditionMet ) {
203+ handleDependentDeleted (dependentResourceNode );
204+ } else {
205+ deleteConditionNotMet .add (dependentResourceNode );
206+ }
207+ } catch (RuntimeException e ) {
208+ handleExceptionInExecutor (dependentResourceNode , e );
209+ } finally {
210+ handleNodeExecutionFinish (dependentResourceNode );
211+ }
212+ }
213+ }
214+
215+ private synchronized void handleDependentDeleted (
216+ DependentResourceNode <?, P > dependentResourceNode ) {
217+ dependentResourceNode .getDependsOn ().forEach (dr -> {
218+ log .debug ("Handle deleted for: {} with dependent: {}" , dr , dependentResourceNode );
219+ handleDelete (dr );
220+ });
221+ }
222+
167223 private boolean isReconcilingNow (DependentResourceNode <?, ?> dependentResourceNode ) {
168224 return actualExecutions .containsKey (dependentResourceNode );
169225 }
170226
171227 private synchronized void handleDependentsReconcile (
172- DependentResourceNode <?, P > dependentResourceNode , boolean onlyReconcileForPossibleDelete ) {
228+ DependentResourceNode <?, P > dependentResourceNode ) {
173229 var dependents = workflow .getDependents (dependentResourceNode );
174230 dependents .forEach (d -> {
175231 log .debug ("Handle reconcile for dependent: {} of parent:{}" , d , dependentResourceNode );
176- handleReconcile (d , onlyReconcileForPossibleDelete );
232+ handleReconcile (d );
177233 });
178234 }
179235
@@ -187,12 +243,20 @@ private boolean alreadyReconciled(
187243 }
188244
189245
190- private void handleReconcileCondition (DependentResourceNode <?, ?> dependentResourceNode ,
191- Condition reconcileCondition ) {
192- boolean conditionMet =
193- reconcileCondition .isMet (dependentResourceNode .getDependentResource (), primary , context );
194- if (!conditionMet ) {
195- ownOrAncestorReconcileConditionConditionNotMet .add (dependentResourceNode );
246+ private void handleReconcileConditionNotMet (DependentResourceNode <?, ?> dependentResourceNode ) {
247+ Set <DependentResourceNode > bottomNodes = new HashSet <>();
248+ markDependentsForDelete (dependentResourceNode , bottomNodes );
249+ bottomNodes .forEach (bn -> handleDelete (bn ));
250+ }
251+
252+ private void markDependentsForDelete (DependentResourceNode <?, ?> dependentResourceNode ,
253+ Set <DependentResourceNode > bottomNodes ) {
254+ markedForDelete .add (dependentResourceNode );
255+ var dependents = workflow .getDependents (dependentResourceNode );
256+ if (dependents .isEmpty ()) {
257+ bottomNodes .add (dependentResourceNode );
258+ } else {
259+ dependents .forEach (d -> markDependentsForDelete (d , bottomNodes ));
196260 }
197261 }
198262
0 commit comments