From e43ec023d1e78d72c55c56d294b293c61f2ad89b Mon Sep 17 00:00:00 2001 From: pavel_silin Date: Wed, 29 Mar 2023 16:32:26 +0300 Subject: [PATCH] issue #3131 fix for run worker nodes - cleanup run configuration for each worker run --- .../PipelineConfigurationManager.java | 36 ++++++++++++------- .../manager/pipeline/PipelineRunManager.java | 7 ++-- .../pipeline/runner/CloudPlatformRunner.java | 33 +++++++++++------ .../configuration/PipelineConfiguration.java | 20 ++++++++++- 4 files changed, 68 insertions(+), 28 deletions(-) diff --git a/api/src/main/java/com/epam/pipeline/manager/pipeline/PipelineConfigurationManager.java b/api/src/main/java/com/epam/pipeline/manager/pipeline/PipelineConfigurationManager.java index d57ecdc24d..058d8704e7 100644 --- a/api/src/main/java/com/epam/pipeline/manager/pipeline/PipelineConfigurationManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/pipeline/PipelineConfigurationManager.java @@ -308,11 +308,20 @@ public void updateMasterConfiguration(PipelineConfiguration configuration, boole configuration.buildEnvVariables(); } - public void updateWorkerConfiguration(String parentId, PipelineStart runVO, - PipelineConfiguration configuration, boolean isNFS, boolean clearParams) { - configuration.setEraseRunEndpoints(hasBooleanParameter(configuration, ERASE_WORKER_ENDPOINTS)); - final Map configParameters = MapUtils.isEmpty(configuration.getParameters()) ? - new HashMap<>() : configuration.getParameters(); + public PipelineConfiguration copyAsMasterConfiguration(final PipelineConfiguration configuration, + final boolean isNFS) { + final PipelineConfiguration copiedConfiguration = configuration.clone(); + updateMasterConfiguration(configuration, isNFS); + return copiedConfiguration; + } + + public PipelineConfiguration generateWorkerConfiguration(final String parentId, final PipelineStart runVO, + final PipelineConfiguration configuration, + final boolean isNFS, final boolean clearParams) { + final PipelineConfiguration workerConfiguration = configuration.clone(); + workerConfiguration.setEraseRunEndpoints(hasBooleanParameter(workerConfiguration, ERASE_WORKER_ENDPOINTS)); + final Map configParameters = MapUtils.isEmpty(workerConfiguration.getParameters()) ? + new HashMap<>() : workerConfiguration.getParameters(); final Map updatedParams = clearParams ? new HashMap<>() : configParameters; final List systemParameters = preferenceManager.getPreference( SystemPreferences.LAUNCH_SYSTEM_PARAMETERS); @@ -331,20 +340,21 @@ public void updateWorkerConfiguration(String parentId, PipelineStart runVO, } else { updatedParams.remove(NFS_CLUSTER_ROLE); } - configuration.setParameters(updatedParams); - configuration.setClusterRole(WORKER_CLUSTER_ROLE); - configuration.setCmdTemplate(StringUtils.hasText(runVO.getWorkerCmd()) ? + workerConfiguration.setParameters(updatedParams); + workerConfiguration.setClusterRole(WORKER_CLUSTER_ROLE); + workerConfiguration.setCmdTemplate(StringUtils.hasText(runVO.getWorkerCmd()) ? runVO.getWorkerCmd() : WORKER_CMD_TEMPLATE); - configuration.setPrettyUrl(null); + workerConfiguration.setPrettyUrl(null); //remove node count parameter for workers - configuration.setNodeCount(null); + workerConfiguration.setNodeCount(null); // if podAssignPolicy is a simple policy to assign run pod to dedicated instance, then we need to cleared it // and workers then will be assigned to its own nodes, otherwise keep existing policy to assign workers // as was configured in policy object - if (configuration.getPodAssignPolicy().isMatch(KubernetesConstants.RUN_ID_LABEL, parentId)) { - configuration.setPodAssignPolicy(null); + if (workerConfiguration.getPodAssignPolicy().isMatch(KubernetesConstants.RUN_ID_LABEL, parentId)) { + workerConfiguration.setPodAssignPolicy(null); } - configuration.buildEnvVariables(); + workerConfiguration.buildEnvVariables(); + return workerConfiguration; } public boolean hasNFSParameter(PipelineConfiguration entry) { diff --git a/api/src/main/java/com/epam/pipeline/manager/pipeline/PipelineRunManager.java b/api/src/main/java/com/epam/pipeline/manager/pipeline/PipelineRunManager.java index 175cd89d09..9abc35f34b 100644 --- a/api/src/main/java/com/epam/pipeline/manager/pipeline/PipelineRunManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/pipeline/PipelineRunManager.java @@ -1284,12 +1284,13 @@ private void runClusterWorkers(PipelineRun run, PipelineStart runVO, String vers PipelineConfiguration configuration) { String parentId = Long.toString(run.getId()); Integer nodeCount = configuration.getNodeCount(); - configurationManager.updateWorkerConfiguration(parentId, runVO, configuration, false, true); for (int i = 0; i < nodeCount; i++) { + final PipelineConfiguration workerConfiguration = configurationManager.generateWorkerConfiguration( + parentId, runVO, configuration, false, true); launchPipeline( - configuration, pipeline, version, runVO.getInstanceType(), runVO.getConfigurationName(), + workerConfiguration, pipeline, version, runVO.getInstanceType(), runVO.getConfigurationName(), parentId, run.getId(), null, null, runVO.getRunSids(), - configuration.getNotifications() + workerConfiguration.getNotifications() ); } } diff --git a/api/src/main/java/com/epam/pipeline/manager/pipeline/runner/CloudPlatformRunner.java b/api/src/main/java/com/epam/pipeline/manager/pipeline/runner/CloudPlatformRunner.java index bb0a1882f4..8578891d24 100644 --- a/api/src/main/java/com/epam/pipeline/manager/pipeline/runner/CloudPlatformRunner.java +++ b/api/src/main/java/com/epam/pipeline/manager/pipeline/runner/CloudPlatformRunner.java @@ -164,30 +164,41 @@ private List runConfigurationEntry(RunConfigurationEntry entry, PipelineStart startVO = entry.toPipelineStart(); startVO.setNotifications(notifications); - if (!StringUtils.hasText(clusterId)) { - log.debug("Launching master entry {}", entry.getName()); - pipelineConfigurationManager.updateMasterConfiguration(configuration, startNFS); - } else { - log.debug("Launching worker entry {}", entry.getName()); - pipelineConfigurationManager.updateWorkerConfiguration(clusterId, startVO, configuration, startNFS, true); - } Pipeline pipeline = entry.getPipelineId() != null ? pipelineManager.load(entry.getPipelineId()) : null; List result = new ArrayList<>(); log.debug("Launching total {} copies of entry {}", copies, entry.getName()); for (int i = 0; i < copies; i++) { + final PipelineConfiguration runConfiguration = + buildRunConfiguration(entry, configuration, clusterId, startNFS, startVO); //only first node may be a NFS server if (i != 0) { - configuration.setCmdTemplate(WORKER_CMD_TEMPLATE); - configuration.getParameters().remove(NFS_CLUSTER_ROLE); - configuration.buildEnvVariables(); + runConfiguration.setCmdTemplate(WORKER_CMD_TEMPLATE); + runConfiguration.getParameters().remove(NFS_CLUSTER_ROLE); + runConfiguration.buildEnvVariables(); } - result.add(pipelineRunManager.launchPipeline(configuration, pipeline, entry.getPipelineVersion(), + result.add(pipelineRunManager.launchPipeline(runConfiguration, pipeline, entry.getPipelineVersion(), startVO.getInstanceType(), startVO.getConfigurationName(), clusterId, null, entityIds, configurationId, startVO.getRunSids(), startVO.getNotifications())); } return result; } + private PipelineConfiguration buildRunConfiguration(final RunConfigurationEntry entry, + final PipelineConfiguration configuration, + final String clusterId, final boolean startNFS, + final PipelineStart startVO) { + final PipelineConfiguration runConfiguration; + if (!StringUtils.hasText(clusterId)) { + log.debug("Launching master entry {}", entry.getName()); + runConfiguration = pipelineConfigurationManager.copyAsMasterConfiguration(configuration, startNFS); + } else { + log.debug("Launching worker entry {}", entry.getName()); + runConfiguration = pipelineConfigurationManager + .generateWorkerConfiguration(clusterId, startVO, configuration, startNFS, true); + } + return runConfiguration; + } + @Data private static class SplitConfig { diff --git a/core/src/main/java/com/epam/pipeline/entity/configuration/PipelineConfiguration.java b/core/src/main/java/com/epam/pipeline/entity/configuration/PipelineConfiguration.java index ca6971bbba..fd97839055 100644 --- a/core/src/main/java/com/epam/pipeline/entity/configuration/PipelineConfiguration.java +++ b/core/src/main/java/com/epam/pipeline/entity/configuration/PipelineConfiguration.java @@ -32,6 +32,7 @@ import org.apache.commons.collections4.ListUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -48,7 +49,7 @@ @Getter @NoArgsConstructor @EqualsAndHashCode -public class PipelineConfiguration { +public class PipelineConfiguration implements Cloneable { private static final String MAIN_FILE = "main_file"; private static final String MAIN_CLASS = "main_class"; @@ -226,4 +227,21 @@ private List adjustPrincipal(final List runsSids, final boolean .peek(runSid -> runSid.setIsPrincipal(principal)) .collect(Collectors.toList()); } + + @Override + public PipelineConfiguration clone() { + try { + final PipelineConfiguration clone = (PipelineConfiguration) super.clone(); + clone.setParameters(new HashMap<>(this.parameters)); + clone.setEnvironmentParams(new HashMap<>(this.environmentParams)); + clone.setSharedWithUsers(new ArrayList<>(this.sharedWithUsers)); + clone.setSharedWithRoles(new ArrayList<>(this.sharedWithRoles)); + clone.setNotifications(new ArrayList<>(this.notifications)); + clone.setTags(new HashMap<>(this.tags)); + clone.setKubeLabels(new HashMap<>(this.kubeLabels)); + return clone; + } catch (CloneNotSupportedException e) { + throw new AssertionError("There was an error while trying to clone PipelineConfiguration object", e); + } + } }