1111
1212import io .fabric8 .kubernetes .api .model .HasMetadata ;
1313import io .javaoperatorsdk .operator .api .reconciler .Context ;
14+ import io .javaoperatorsdk .operator .api .reconciler .dependent .Deleter ;
1415
1516public class WorkflowCleanupExecutor <P extends HasMetadata > {
1617
@@ -20,10 +21,8 @@ public class WorkflowCleanupExecutor<P extends HasMetadata> {
2021 new HashMap <>();
2122 private final Map <DependentResourceNode <?, ?>, Exception > exceptionsDuringExecution =
2223 new HashMap <>();
23- private final Set <DependentResourceNode <?, ?>> alreadyReconciled = new HashSet <>();
24+ private final Set <DependentResourceNode <?, ?>> alreadyVisited = new HashSet <>();
2425 private final Set <DependentResourceNode <?, ?>> notReady = new HashSet <>();
25- private final Set <DependentResourceNode <?, ?>> ownOrAncestorReconcileConditionConditionNotMet =
26- new HashSet <>();
2726
2827 private final Workflow <P > workflow ;
2928 private final P primary ;
@@ -35,11 +34,13 @@ public WorkflowCleanupExecutor(Workflow<P> workflow, P primary, Context<P> conte
3534 this .context = context ;
3635 }
3736
37+ // todo cleanup condition
38+ // todo error handling
3839
3940 public synchronized WorkflowCleanupResult cleanup () {
4041 for (DependentResourceNode <?, P > dependentResourceNode : workflow
4142 .getBottomLevelResource ()) {
42- handleCleanup (dependentResourceNode , false );
43+ handleCleanup (dependentResourceNode );
4344 }
4445 while (true ) {
4546 try {
@@ -57,15 +58,11 @@ public synchronized WorkflowCleanupResult cleanup() {
5758 return createCleanupResult ();
5859 }
5960
60- private WorkflowCleanupResult createCleanupResult () {
61- return new WorkflowCleanupResult ();
62- }
63-
6461 private synchronized boolean noMoreExecutionsScheduled () {
6562 return actualExecutions .isEmpty ();
6663 }
6764
68- private void handleCleanup (DependentResourceNode <?, P > dependentResourceNode , boolean b ) {
65+ private synchronized void handleCleanup (DependentResourceNode <?, P > dependentResourceNode ) {
6966 log .debug ("Submitting for cleanup: {}" , dependentResourceNode );
7067
7168 if (alreadyVisited (dependentResourceNode )
@@ -76,24 +73,30 @@ private void handleCleanup(DependentResourceNode<?, P> dependentResourceNode, bo
7673 return ;
7774 }
7875
76+ Future <?> nodeFuture =
77+ workflow .getExecutorService ().submit (
78+ new NodeExecutor (dependentResourceNode ));
79+ actualExecutions .put (dependentResourceNode , nodeFuture );
80+ log .debug ("Submitted to reconcile: {}" , dependentResourceNode );
7981 }
8082
8183 private class NodeExecutor implements Runnable {
8284
8385 private final DependentResourceNode <?, P > dependentResourceNode ;
84- private final boolean onlyReconcileForPossibleDelete ;
8586
86- private NodeExecutor (DependentResourceNode <?, P > dependentResourceNode ,
87- boolean onlyReconcileForDelete ) {
87+ private NodeExecutor (DependentResourceNode <?, P > dependentResourceNode ) {
8888 this .dependentResourceNode = dependentResourceNode ;
89- this .onlyReconcileForPossibleDelete = onlyReconcileForDelete ;
9089 }
9190
9291 @ Override
9392 @ SuppressWarnings ("unchecked" )
9493 public void run () {
9594 try {
96-
95+ if (dependentResourceNode .getDependentResource () instanceof Deleter ) {
96+ // todo check if not garbage collected
97+ ((Deleter <P >) dependentResourceNode .getDependentResource ()).delete (primary , context );
98+ }
99+ handleDependentCleaned (dependentResourceNode );
97100 } catch (RuntimeException e ) {
98101 handleExceptionInExecutor (dependentResourceNode , e );
99102 } finally {
@@ -102,12 +105,26 @@ public void run() {
102105 }
103106 }
104107
105- private synchronized void handleExceptionInExecutor (DependentResourceNode dependentResourceNode ,
108+ @ SuppressWarnings ("unchecked" )
109+ private synchronized void handleDependentCleaned (
110+ DependentResourceNode <?, P > dependentResourceNode ) {
111+ var dependOns = dependentResourceNode .getDependsOn ();
112+ if (dependOns != null ) {
113+ dependOns .forEach (d -> {
114+ log .debug ("Handle cleanup for dependent: {} of parent:{}" , d , dependentResourceNode );
115+ handleCleanup (d );
116+ });
117+ }
118+ }
119+
120+ private synchronized void handleExceptionInExecutor (
121+ DependentResourceNode <?, P > dependentResourceNode ,
106122 RuntimeException e ) {
107123 exceptionsDuringExecution .put (dependentResourceNode , e );
108124 }
109125
110- private synchronized void handleNodeExecutionFinish (DependentResourceNode dependentResourceNode ) {
126+ private synchronized void handleNodeExecutionFinish (
127+ DependentResourceNode <?, P > dependentResourceNode ) {
111128 log .debug ("Finished execution for: {}" , dependentResourceNode );
112129 actualExecutions .remove (dependentResourceNode );
113130 if (actualExecutions .isEmpty ()) {
@@ -119,23 +136,27 @@ private boolean isCleaningNow(DependentResourceNode<?, ?> dependentResourceNode)
119136 return actualExecutions .containsKey (dependentResourceNode );
120137 }
121138
122-
123139 private boolean alreadyVisited (
124140 DependentResourceNode <?, ?> dependentResourceNode ) {
125- return alreadyReconciled .contains (dependentResourceNode );
141+ return alreadyVisited .contains (dependentResourceNode );
126142 }
127143
128144 private boolean allParentsCleaned (
129145 DependentResourceNode <?, ?> dependentResourceNode ) {
130- return dependentResourceNode .getDependsOn ().isEmpty ()
131- || dependentResourceNode .getDependsOn ().stream ()
146+ var parents = workflow .getDependents ().get (dependentResourceNode );
147+ return parents .isEmpty ()
148+ || parents .stream ()
132149 .allMatch (d -> alreadyVisited (d ) && !notReady .contains (d ));
133150 }
134151
135152 private boolean hasErroredParent (
136153 DependentResourceNode <?, ?> dependentResourceNode ) {
137- return !dependentResourceNode .getDependsOn ().isEmpty ()
138- && dependentResourceNode .getDependsOn ().stream ()
139- .anyMatch (exceptionsDuringExecution ::containsKey );
154+ var parents = workflow .getDependents ().get (dependentResourceNode );
155+ return !parents .isEmpty ()
156+ && parents .stream ().anyMatch (exceptionsDuringExecution ::containsKey );
157+ }
158+
159+ private WorkflowCleanupResult createCleanupResult () {
160+ return new WorkflowCleanupResult ();
140161 }
141162}
0 commit comments