Skip to content

Commit

Permalink
issue #3131 fix for run worker nodes - cleanup run configuration for …
Browse files Browse the repository at this point in the history
…each worker run
  • Loading branch information
SilinPavel committed Mar 30, 2023
1 parent 0823f33 commit e43ec02
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,20 @@ public void updateMasterConfiguration(PipelineConfiguration configuration, boole
configuration.buildEnvVariables();
}

public void updateWorkerConfiguration(String parentId, PipelineStart runVO,
PipelineConfiguration configuration, boolean isNFS, boolean clearParams) {
configuration.setEraseRunEndpoints(hasBooleanParameter(configuration, ERASE_WORKER_ENDPOINTS));
final Map<String, PipeConfValueVO> configParameters = MapUtils.isEmpty(configuration.getParameters()) ?
new HashMap<>() : configuration.getParameters();
public PipelineConfiguration copyAsMasterConfiguration(final PipelineConfiguration configuration,
final boolean isNFS) {
final PipelineConfiguration copiedConfiguration = configuration.clone();
updateMasterConfiguration(configuration, isNFS);
return copiedConfiguration;
}

public PipelineConfiguration generateWorkerConfiguration(final String parentId, final PipelineStart runVO,
final PipelineConfiguration configuration,
final boolean isNFS, final boolean clearParams) {
final PipelineConfiguration workerConfiguration = configuration.clone();
workerConfiguration.setEraseRunEndpoints(hasBooleanParameter(workerConfiguration, ERASE_WORKER_ENDPOINTS));
final Map<String, PipeConfValueVO> configParameters = MapUtils.isEmpty(workerConfiguration.getParameters()) ?
new HashMap<>() : workerConfiguration.getParameters();
final Map<String, PipeConfValueVO> updatedParams = clearParams ? new HashMap<>() : configParameters;
final List<DefaultSystemParameter> systemParameters = preferenceManager.getPreference(
SystemPreferences.LAUNCH_SYSTEM_PARAMETERS);
Expand All @@ -331,20 +340,21 @@ public void updateWorkerConfiguration(String parentId, PipelineStart runVO,
} else {
updatedParams.remove(NFS_CLUSTER_ROLE);
}
configuration.setParameters(updatedParams);
configuration.setClusterRole(WORKER_CLUSTER_ROLE);
configuration.setCmdTemplate(StringUtils.hasText(runVO.getWorkerCmd()) ?
workerConfiguration.setParameters(updatedParams);
workerConfiguration.setClusterRole(WORKER_CLUSTER_ROLE);
workerConfiguration.setCmdTemplate(StringUtils.hasText(runVO.getWorkerCmd()) ?
runVO.getWorkerCmd() : WORKER_CMD_TEMPLATE);
configuration.setPrettyUrl(null);
workerConfiguration.setPrettyUrl(null);
//remove node count parameter for workers
configuration.setNodeCount(null);
workerConfiguration.setNodeCount(null);
// if podAssignPolicy is a simple policy to assign run pod to dedicated instance, then we need to cleared it
// and workers then will be assigned to its own nodes, otherwise keep existing policy to assign workers
// as was configured in policy object
if (configuration.getPodAssignPolicy().isMatch(KubernetesConstants.RUN_ID_LABEL, parentId)) {
configuration.setPodAssignPolicy(null);
if (workerConfiguration.getPodAssignPolicy().isMatch(KubernetesConstants.RUN_ID_LABEL, parentId)) {
workerConfiguration.setPodAssignPolicy(null);
}
configuration.buildEnvVariables();
workerConfiguration.buildEnvVariables();
return workerConfiguration;
}

public boolean hasNFSParameter(PipelineConfiguration entry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1284,12 +1284,13 @@ private void runClusterWorkers(PipelineRun run, PipelineStart runVO, String vers
PipelineConfiguration configuration) {
String parentId = Long.toString(run.getId());
Integer nodeCount = configuration.getNodeCount();
configurationManager.updateWorkerConfiguration(parentId, runVO, configuration, false, true);
for (int i = 0; i < nodeCount; i++) {
final PipelineConfiguration workerConfiguration = configurationManager.generateWorkerConfiguration(
parentId, runVO, configuration, false, true);
launchPipeline(
configuration, pipeline, version, runVO.getInstanceType(), runVO.getConfigurationName(),
workerConfiguration, pipeline, version, runVO.getInstanceType(), runVO.getConfigurationName(),
parentId, run.getId(), null, null, runVO.getRunSids(),
configuration.getNotifications()
workerConfiguration.getNotifications()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,30 +164,41 @@ private List<PipelineRun> runConfigurationEntry(RunConfigurationEntry entry,

PipelineStart startVO = entry.toPipelineStart();
startVO.setNotifications(notifications);
if (!StringUtils.hasText(clusterId)) {
log.debug("Launching master entry {}", entry.getName());
pipelineConfigurationManager.updateMasterConfiguration(configuration, startNFS);
} else {
log.debug("Launching worker entry {}", entry.getName());
pipelineConfigurationManager.updateWorkerConfiguration(clusterId, startVO, configuration, startNFS, true);
}
Pipeline pipeline = entry.getPipelineId() != null ? pipelineManager.load(entry.getPipelineId()) : null;
List<PipelineRun> result = new ArrayList<>();
log.debug("Launching total {} copies of entry {}", copies, entry.getName());
for (int i = 0; i < copies; i++) {
final PipelineConfiguration runConfiguration =
buildRunConfiguration(entry, configuration, clusterId, startNFS, startVO);
//only first node may be a NFS server
if (i != 0) {
configuration.setCmdTemplate(WORKER_CMD_TEMPLATE);
configuration.getParameters().remove(NFS_CLUSTER_ROLE);
configuration.buildEnvVariables();
runConfiguration.setCmdTemplate(WORKER_CMD_TEMPLATE);
runConfiguration.getParameters().remove(NFS_CLUSTER_ROLE);
runConfiguration.buildEnvVariables();
}
result.add(pipelineRunManager.launchPipeline(configuration, pipeline, entry.getPipelineVersion(),
result.add(pipelineRunManager.launchPipeline(runConfiguration, pipeline, entry.getPipelineVersion(),
startVO.getInstanceType(), startVO.getConfigurationName(), clusterId,
null, entityIds, configurationId, startVO.getRunSids(), startVO.getNotifications()));
}
return result;
}

private PipelineConfiguration buildRunConfiguration(final RunConfigurationEntry entry,
final PipelineConfiguration configuration,
final String clusterId, final boolean startNFS,
final PipelineStart startVO) {
final PipelineConfiguration runConfiguration;
if (!StringUtils.hasText(clusterId)) {
log.debug("Launching master entry {}", entry.getName());
runConfiguration = pipelineConfigurationManager.copyAsMasterConfiguration(configuration, startNFS);
} else {
log.debug("Launching worker entry {}", entry.getName());
runConfiguration = pipelineConfigurationManager
.generateWorkerConfiguration(clusterId, startVO, configuration, startNFS, true);
}
return runConfiguration;
}

@Data
private static class SplitConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.collections4.ListUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -48,7 +49,7 @@
@Getter
@NoArgsConstructor
@EqualsAndHashCode
public class PipelineConfiguration {
public class PipelineConfiguration implements Cloneable {

private static final String MAIN_FILE = "main_file";
private static final String MAIN_CLASS = "main_class";
Expand Down Expand Up @@ -226,4 +227,21 @@ private List<RunSid> adjustPrincipal(final List<RunSid> runsSids, final boolean
.peek(runSid -> runSid.setIsPrincipal(principal))
.collect(Collectors.toList());
}

@Override
public PipelineConfiguration clone() {
try {
final PipelineConfiguration clone = (PipelineConfiguration) super.clone();
clone.setParameters(new HashMap<>(this.parameters));
clone.setEnvironmentParams(new HashMap<>(this.environmentParams));
clone.setSharedWithUsers(new ArrayList<>(this.sharedWithUsers));
clone.setSharedWithRoles(new ArrayList<>(this.sharedWithRoles));
clone.setNotifications(new ArrayList<>(this.notifications));
clone.setTags(new HashMap<>(this.tags));
clone.setKubeLabels(new HashMap<>(this.kubeLabels));
return clone;
} catch (CloneNotSupportedException e) {
throw new AssertionError("There was an error while trying to clone PipelineConfiguration object", e);
}
}
}

0 comments on commit e43ec02

Please sign in to comment.