11package io .javaoperatorsdk .operator .processing .dependent .workflow ;
22
3- import java .util .ArrayList ;
4- import java .util .List ;
5- import java .util .Map ;
3+ import java .util .*;
64import java .util .concurrent .ConcurrentHashMap ;
75import java .util .concurrent .ExecutorService ;
86import java .util .concurrent .Executors ;
1816 */
1917public class Workflow <P extends HasMetadata > {
2018
21- private final List <DependentResourceNode <?, P >> dependentResourceNodes ;
22- private final List <DependentResourceNode <?, P >> topLevelResources = new ArrayList <>();
19+ private final Set <DependentResourceNode <?, P >> dependentResourceNodes ;
20+ private final Set <DependentResourceNode <?, P >> topLevelResources = new HashSet <>();
21+ private final Set <DependentResourceNode <?, P >> bottomLevelResource = new HashSet <>();
2322 private Map <DependentResourceNode <?, P >, List <DependentResourceNode <?, P >>> dependents ;
2423
2524 // it's "global" executor service shared between multiple reconciliations running parallel
2625 private ExecutorService executorService ;
2726
28- public Workflow (List <DependentResourceNode <?, P >> dependentResourceNodes ) {
27+ public Workflow (Set <DependentResourceNode <?, P >> dependentResourceNodes ) {
2928 this .executorService = ConfigurationServiceProvider .instance ().getExecutorService ();
3029 this .dependentResourceNodes = dependentResourceNodes ;
3130 preprocessForReconcile ();
3231 }
3332
34- public Workflow (List <DependentResourceNode <?, P >> dependentResourceNodes ,
33+ public Workflow (Set <DependentResourceNode <?, P >> dependentResourceNodes ,
3534 ExecutorService executorService ) {
3635 this .executorService = executorService ;
3736 this .dependentResourceNodes = dependentResourceNodes ;
3837 preprocessForReconcile ();
3938 }
4039
41- public Workflow (List <DependentResourceNode <?, P >> dependentResourceNodes , int globalParallelism ) {
40+ public Workflow (Set <DependentResourceNode <?, P >> dependentResourceNodes , int globalParallelism ) {
4241 this (dependentResourceNodes , Executors .newFixedThreadPool (globalParallelism ));
4342 }
4443
4544 public WorkflowExecutionResult reconcile (P primary , Context <P > context ) {
46- WorkflowReconcileExecutor workflowReconcileExecutor =
45+ WorkflowReconcileExecutor < P > workflowReconcileExecutor =
4746 new WorkflowReconcileExecutor <>(this , primary , context );
4847 return workflowReconcileExecutor .reconcile ();
4948 }
5049
51- public void cleanup (P resource , Context <P > context ) {
52-
50+ public WorkflowCleanupResult cleanup (P primary , Context <P > context ) {
51+ WorkflowCleanupExecutor <P > workflowCleanupExecutor =
52+ new WorkflowCleanupExecutor <>(this , primary , context );
53+ return workflowCleanupExecutor .cleanup ();
5354 }
5455
5556 // add cycle detection?
5657 private void preprocessForReconcile () {
58+ bottomLevelResource .addAll (dependentResourceNodes );
5759 dependents = new ConcurrentHashMap <>(dependentResourceNodes .size ());
5860 for (DependentResourceNode <?, P > node : dependentResourceNodes ) {
5961 if (node .getDependsOn ().isEmpty ()) {
@@ -62,6 +64,7 @@ private void preprocessForReconcile() {
6264 for (DependentResourceNode <?, P > dependsOn : node .getDependsOn ()) {
6365 dependents .computeIfAbsent (dependsOn , dr -> new ArrayList <>());
6466 dependents .get (dependsOn ).add (node );
67+ bottomLevelResource .remove (dependsOn );
6568 }
6669 }
6770 }
@@ -71,10 +74,14 @@ public void setExecutorService(ExecutorService executorService) {
7174 this .executorService = executorService ;
7275 }
7376
74- List <DependentResourceNode <?, P >> getTopLevelDependentResources () {
77+ Set <DependentResourceNode <?, P >> getTopLevelDependentResources () {
7578 return topLevelResources ;
7679 }
7780
81+ Set <DependentResourceNode <?, P >> getBottomLevelResource () {
82+ return bottomLevelResource ;
83+ }
84+
7885 Map <DependentResourceNode <?, P >, List <DependentResourceNode <?, P >>> getDependents () {
7986 return dependents ;
8087 }
0 commit comments