Skip to content

Commit

Permalink
Fix unfinished step in parallel flow
Browse files Browse the repository at this point in the history
Resolves #3939

Signed-off-by: Mahmoud Ben Hassine <mbenhassine@vmware.com>
  • Loading branch information
doontagi authored and fmbenhassine committed Dec 17, 2024
1 parent bd00cde commit aafecb9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2023 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand Down Expand Up @@ -119,7 +120,7 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception
FlowExecutionStatus parentSplitStatus = parentSplit == null ? null : parentSplit.handle(executor);

Collection<FlowExecution> results = new ArrayList<>();

List<Exception> exceptions = new ArrayList<>();
// Could use a CompletionService here?
for (Future<FlowExecution> task : tasks) {
try {
Expand All @@ -129,14 +130,18 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception
// Unwrap the expected exceptions
Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
exceptions.add((Exception) cause);
}
else {
throw e;
exceptions.add(e);
}
}
}

if (!exceptions.isEmpty()) {
throw exceptions.get(0);
}

FlowExecutionStatus flowExecutionStatus = doAggregation(results, executor);
if (parentSplitStatus != null) {
return Collections.max(Arrays.asList(flowExecutionStatus, parentSplitStatus));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,14 @@
package org.springframework.batch.core.job.builder;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.BatchStatus;
Expand All @@ -45,6 +48,8 @@
import org.springframework.batch.core.step.StepSupport;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
Expand Down Expand Up @@ -369,4 +374,38 @@ public JdbcTransactionManager transactionManager(DataSource dataSource) {

}

@Test
public void testBuildSplitWithParallelFlow() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Step longExecutingStep = new StepBuilder("longExecutingStep", jobRepository).tasklet((stepContribution, b) -> {
Thread.sleep(500L);
return RepeatStatus.FINISHED;
}, new ResourcelessTransactionManager()).build();

Step interruptedStep = new StepBuilder("interruptedStep", jobRepository).tasklet((stepContribution, b) -> {
stepContribution.getStepExecution().setTerminateOnly();
return RepeatStatus.FINISHED;
}, new ResourcelessTransactionManager()).build();

Step nonExecutableStep = new StepBuilder("nonExecutableStep", jobRepository).tasklet((stepContribution, b) -> {
countDownLatch.countDown();
return RepeatStatus.FINISHED;
}, new ResourcelessTransactionManager()).build();

Flow twoStepFlow = new FlowBuilder<SimpleFlow>("twoStepFlow").start(longExecutingStep)
.next(nonExecutableStep)
.build();
Flow interruptedFlow = new FlowBuilder<SimpleFlow>("interruptedFlow").start(interruptedStep).build();

Flow splitFlow = new FlowBuilder<Flow>("splitFlow").split(new SimpleAsyncTaskExecutor())
.add(interruptedFlow, twoStepFlow)
.build();
FlowJobBuilder jobBuilder = new JobBuilder("job", jobRepository).start(splitFlow).build();
jobBuilder.preventRestart().build().execute(execution);

boolean isExecutedNonExecutableStep = countDownLatch.await(1, TimeUnit.SECONDS);
assertEquals(BatchStatus.STOPPED, execution.getStatus());
Assertions.assertFalse(isExecutedNonExecutableStep);
}

}

0 comments on commit aafecb9

Please sign in to comment.