Skip to content

Commit

Permalink
Edge providerId propagation and getCluster support for not submitted …
Browse files Browse the repository at this point in the history
…job (#110)

* propagate edge providerId and improve deleteCluster

* getCluster to support nodes without submittedJob
  • Loading branch information
ankicabarisic authored Nov 15, 2024
1 parent bf1bd65 commit e897f8d
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,9 @@ public boolean deleteCluster(String sessionId, String clusterName) throws NotCon
Cluster toScaleCluster = getCluster(sessionId, clusterName);
for (ClusterNodeDefinition node : toScaleCluster.getNodes()) {
if (node != null) {
//check the node job state

deleteNode(sessionId, clusterName, node, "", false);
}
} else
LOGGER.warn("Cannot delete a null node.");
}
repositoryService.deleteCluster(toScaleCluster);
repositoryService.flush();
Expand Down Expand Up @@ -407,49 +406,78 @@ private String getNodeUrl(String sessionId, String clusterName, ClusterNodeDefin
private Long deleteNode(String sessionId, String clusterName, ClusterNodeDefinition node, String masterNodeToken,
boolean drain) throws NotConnectedException {
String nodeUrl = getNodeUrl(sessionId, clusterName, node);
Long jobId = -1L;
if (nodeUrl != null && !nodeUrl.isEmpty()) {
try {
if (drain) {
jobId = jobService.submitOneTaskJob(sessionId,
nodeUrl,
masterNodeToken,
"delete_node_" + node.getName(),
"drain-delete",
node.getNodeJobName(clusterName));
} else {
jobId = jobService.submitOneTaskJob(sessionId,
nodeUrl,
masterNodeToken,
"delete_node_" + node.getName(),
"delete",
"");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
LOGGER.warn("unable to delete node {}, with the node url \"{}\" !", node.getName(), node.getNodeUrl());
// return jobId;
if (nodeUrl == null || nodeUrl.isEmpty()) {
LOGGER.warn("Unable to delete node {}, as the node URL is empty or null.", node.getName());
}
Job nodeJob = repositoryService.getJob(node.getNodeJobName(clusterName));
JobState jobState = jobService.getJobState(sessionId, nodeJob.getJobId());
if (jobState.getJobStatus().isJobAlive())
jobService.killJob(sessionId, nodeJob.getJobId());
List<Task> nodeTasks = nodeJob.getTasks();
List<Deployment> nodeDeployments = new ArrayList<>();
for (Task task : nodeTasks) {
nodeDeployments.addAll(task.getDeployments());

Long jobId = -1L;
try {
// Submit the job to delete the node (either drain-delete or delete)
String jobName = "delete_node_" + node.getName();
String jobType = drain ? "drain-delete" : "delete";
String nodeJobName = drain ? node.getNodeJobName(clusterName) : "";

jobId = jobService.submitOneTaskJob(sessionId, nodeUrl, masterNodeToken, jobName, jobType, nodeJobName);

// Proceed to clean up tasks and deployments if the job was submitted
cleanupNodeJob(sessionId, clusterName, node);

} catch (IOException e) {
LOGGER.error("Failed to submit delete job for node {}: {}", node.getName(), e.getMessage());
throw new RuntimeException("Error submitting delete job for node " + node.getName(), e);
} catch (Exception e) {
LOGGER.error("Unexpected error occurred while deleting node {}: {}", node.getName(), e.getMessage());
throw new RuntimeException("Unexpected error occurred while deleting node " + node.getName(), e);
}
nodeDeployments.forEach(deployment -> repositoryService.deleteDeployment(deployment));
repositoryService.deleteJob(node.getNodeJobName(clusterName));
nodeTasks.forEach(task -> repositoryService.deleteTask(task));

repositoryService.flush();
return jobId;
}

private void cleanupNodeJob(String sessionId, String clusterName, ClusterNodeDefinition node)
throws NotConnectedException {
Job nodeJob = repositoryService.getJob(node.getNodeJobName(clusterName));
if (nodeJob == null) {
LOGGER.info("No job found for node {}, skipping cleanup.", node.getName());
return;
}

try {
// Check if job is alive and kill it if necessary
if (nodeJob.getSubmittedJobId() != 0L) {
JobState jobState = jobService.getJobState(sessionId, nodeJob.getJobId());
if (jobState.getJobStatus().isJobAlive()) {
jobService.killJob(sessionId, nodeJob.getJobId());
}
}

// Gather all tasks and deployments for cleanup
List<Task> nodeTasks = nodeJob.getTasks();
List<Deployment> nodeDeployments = new ArrayList<>();
for (Task task : nodeTasks) {
nodeDeployments.addAll(task.getDeployments());
}

// Delete deployments, tasks, and the job in sequence
nodeDeployments.forEach(deployment -> {
try {
repositoryService.deleteDeployment(deployment);
} catch (Exception e) {
LOGGER.warn("Failed to delete deployment for node {}: {}", node.getName(), e.getMessage());
}
});

repositoryService.deleteJob(node.getNodeJobName(clusterName));
nodeTasks.forEach(task -> repositoryService.deleteTask(task));
repositoryService.flush();

LOGGER.info("Cleanup completed for node {}", node.getName());

} catch (Exception e) {
LOGGER.error("Error occurred during cleanup for node {}: {}", node.getName(), e.getMessage());
throw new RuntimeException("Error during cleanup for node " + node.getName(), e);
}
}

private ClusterNodeDefinition getNodeFromCluster(Cluster cluster, String nodeName) {
for (ClusterNodeDefinition node : cluster.getNodes()) {
if (Objects.equals(node.getName(), nodeName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,20 +431,35 @@ public JobResult waitForJob(String sessionId, String jobId, long timeout) throws
public Map<String, Serializable> getJobResultMaps(String sessionId, String jobId, long timeout)
throws NotConnectedException {
if (!paGatewayService.isConnectionActive(sessionId)) {
throw new NotConnectedException();
throw new NotConnectedException("Session is not active for ID: " + sessionId);
}
// Fetch the job from the repository
Job submittedJob = repositoryService.getJob(jobId);
List<String> jobIds = new ArrayList<>();
jobIds.add(String.valueOf(submittedJob.getSubmittedJobId()));
Map<Long, Map<String, Serializable>> jobResult = schedulerGateway.getJobResultMaps(jobIds);
LOGGER.info("Results of job: " + jobId + " fetched successfully: " +
Optional.ofNullable(jobResult).map(Map<Long, Map<String, Serializable>>::toString).orElse(null));
if (jobResult != null) {
return jobResult.get(submittedJob.getSubmittedJobId());
} else {
return new HashMap();
if (submittedJob == null) {
LOGGER.warn("No job found with ID: {}", jobId);
return Collections.emptyMap(); // Return an empty map if no job is found
}

// Prepare the job ID list for the scheduler query
List<String> jobIds = Collections.singletonList(String.valueOf(submittedJob.getSubmittedJobId()));

// Fetch job results
Map<Long, Map<String, Serializable>> jobResult;
try {
jobResult = schedulerGateway.getJobResultMaps(jobIds);
} catch (Exception e) {
LOGGER.error("Failed to fetch results for job ID: {}. Error: {}", jobId, e.getMessage(), e);
throw new RuntimeException("Error fetching job results for job ID: " + jobId, e);
}

// Log and return results
LOGGER.info("Results of job {} fetched successfully: {}",
jobId,
Optional.ofNullable(jobResult).map(Object::toString).orElse("No results available"));

// Return the result for the specific job ID, or an empty map if no results are available
return jobResult != null ? jobResult.getOrDefault(submittedJob.getSubmittedJobId(), Collections.emptyMap())
: Collections.emptyMap();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static NodeCandidate createNodeCandidate(NodeProperties np, String jobId,
hardware.setDisk((double) np.getDisk());
hardware.setRam(np.getMemory());
hardware.setFpga("");
hardware.setProviderId(np.getProviderId());
//Define the location
Location location = new Location();
location.setGeoLocation(np.getGeoLocation());
Expand Down

0 comments on commit e897f8d

Please sign in to comment.