diff --git a/jbatch/pom.xml b/jbatch/pom.xml index d59f0fe..bb8e727 100644 --- a/jbatch/pom.xml +++ b/jbatch/pom.xml @@ -97,7 +97,7 @@ com.ibm.jbatch.tck com.ibm.jbatch.tck - 1.1-b02 + 1.1-b03 test @@ -118,7 +118,7 @@ com.ibm.jbatch.tck com.ibm.jbatch.tck.spi - 1.1-b02 + 1.1-b03 test diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java index 16ed31d..27398c5 100755 --- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java +++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java @@ -67,12 +67,16 @@ public abstract class BaseStepController implements ExecutionElementController { protected StepContextImpl stepContext; protected Step step; + protected String stepName; protected StepStatus stepStatus; protected BlockingQueue analyzerStatusQueue = null; protected long rootJobExecutionId; + // Restart of partitioned steps needs to be handled specially + protected boolean restartAfterCompletion = false; + protected final BatchKernelService kernelService; protected final PersistenceManagerService persistenceManagerService; private final JobStatusManagerService statusManagerService; @@ -90,7 +94,7 @@ protected BaseStepController(final RuntimeJobExecution jobExecution, final Step throw new IllegalArgumentException("Step parameter to ctor cannot be null."); } this.step = step; - + this.stepName = step.getId(); this.txService = servicesManager.service(TransactionManagementService.class); this.kernelService = servicesManager.service(BatchKernelService.class); this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class); @@ -311,6 +315,8 @@ private boolean shouldStepBeExecutedOnRestart() { // boolean, but it should default to 'false', which is the spec'd default. if (!Boolean.parseBoolean(step.getAllowStartIfComplete())) { return false; + } else { + restartAfterCompletion = true; } } @@ -364,6 +370,10 @@ protected void persistUserData() { statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus); } + protected boolean isRestartExecution() { + return stepStatus.getStartCount() > 1; + } + protected void persistExitStatusAndEndTimestamp() { stepStatus.setExitStatus(stepContext.getExitStatus()); statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus); diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java index e072a73..c34f90c 100755 --- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java +++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java @@ -71,12 +71,32 @@ public class PartitionedStepController extends BaseStepController { private PartitionReducer partitionReducerProxy = null; + private enum ExecutionType { + /** + * First execution of this step for the job instance (among all job executions) + */ + START, + /** + * Step previously executed but did not complete successfully, override=false so continue from previous partitions' checkpoints, etc. + */ + RESTART_NORMAL, + /** + * Step previously executed but did not complete successfully, override=true so start with an entire set of new partitions, checkpoints, etc. + */ + RESTART_OVERRIDE, + /** + * Step previously completed, but we are re-executing with an entire set of new partitions, checkpoints, etc. + */ + RESTART_AFTER_COMPLETION}; + private ExecutionType executionType = null; + // On invocation this will be re-primed to reflect already-completed partitions from a previous execution. int numPreviouslyCompleted = 0; private PartitionAnalyzer analyzerProxy = null; final List subJobs = new ArrayList(); + protected List stepListeners = null; List completedWork = new ArrayList(); @@ -180,7 +200,7 @@ private PartitionPlan generatePartitionPlan() { int numPartitions = Integer.MIN_VALUE; int numThreads; Properties[] partitionProps = null; - + if (partitionsAttr != null) { try { numPartitions = Integer.parseInt(partitionsAttr); @@ -247,14 +267,47 @@ private PartitionPlan generatePartitionPlan() { return plan; } + private void calculateExecutionType() { + // We want to ignore override on the initial execution + if (isRestartExecution()) { + if (restartAfterCompletion) { + executionType = ExecutionType.RESTART_AFTER_COMPLETION; + } else if (plan.getPartitionsOverride()) { + executionType = ExecutionType.RESTART_OVERRIDE; + } else { + executionType = ExecutionType.RESTART_NORMAL; + } + } else { + executionType = ExecutionType.START; + } + } + + private void validateNumberOfPartitions() { + + int currentPlanSize = plan.getPartitions(); + + if (executionType == ExecutionType.RESTART_NORMAL) { + int previousPlanSize = stepStatus.getNumPartitions(); + if (previousPlanSize > 0 && previousPlanSize != currentPlanSize) { + String msg = "On a normal restart, the plan on restart specified: " + currentPlanSize + " # of partitions, but the previous " + + "executions' plan specified a different number: " + previousPlanSize + " # of partitions. Failing job."; + throw new IllegalStateException(msg); + } + } + + //persist the partition plan so on restart we have the same plan to reuse + stepStatus.setNumPartitions(currentPlanSize); + } + @Override protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException { this.plan = this.generatePartitionPlan(); - //persist the partition plan so on restart we have the same plan to reuse - stepStatus.setNumPartitions(plan.getPartitions()); + calculateExecutionType(); + + validateNumberOfPartitions(); /* When true is specified, the partition count from the current run * is used and all results from past partitions are discarded. Any @@ -263,7 +316,7 @@ protected void invokeCoreStep() throws JobRestartException, JobStartException, J * rollbackPartitionedStep method is invoked during restart before any * partitions begin processing to provide a cleanup hook. */ - if (plan.getPartitionsOverride()) { + if (executionType == ExecutionType.RESTART_OVERRIDE) { if (this.partitionReducerProxy != null) { try { this.partitionReducerProxy.rollbackPartitionedStep(); @@ -303,9 +356,14 @@ private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartExc PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId()); // Then build all the subjobs but do not start them yet - if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) { + if (executionType == ExecutionType.RESTART_NORMAL) { parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext); } else { + // This case includes RESTART_OVERRIDE and RESTART_AFTER_COMPLETION. + // + // So we're just going to create new "subjob" job instances in the DB in these cases, + // and we'll have to make sure we're dealing with the correct ones, say in a subsequent "normal" restart + // (of the current execution which is itself a restart) parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext); } @@ -324,6 +382,12 @@ private void executeAndWaitForCompletion() throws JobRestartException { int numCurrentCompleted = 0; int numCurrentSubmitted = 0; + + // All partitions have already completed on a previous execution. + if (numTotalForThisExecution == 0) { + return; + } + //Start up to to the max num we are allowed from the num threads attribute for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) { final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i); diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java index deaa029..eca7a66 100755 --- a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java +++ b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java @@ -185,6 +185,9 @@ public List buildNewParallelPartitions(final PartitionsB return batchWorkUnits; } + /* + * There are some assumptions that all partition subjobs have associated DB entries + */ @Override public List buildOnRestartParallelPartitions(final PartitionsBuilderConfig config, final JobContextImpl jc, final StepContextImpl sc) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException { @@ -201,7 +204,7 @@ public List buildOnRestartParallelPartitions(final Parti final Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance]; try { - final long execId = getMostRecentExecutionId(parallelJob); + final long execId = getMostRecentSubJobExecutionId(parallelJob); final RuntimeJobExecution jobExecution; try { jobExecution = JobExecutionHelper.restartPartition(servicesManager, execId, parallelJob, partitionProps); @@ -244,15 +247,13 @@ public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(final FlowInSplitBui return batchWork; } - private long getMostRecentExecutionId(final JSLJob jobModel) { + private long getMostRecentSubJobExecutionId(JSLJob jobModel) { - //There can only be one instance associated with a subjob's id since it is generated from an unique - //job instance id. So there should be no way to directly start a subjob with particular - final List instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2); + // Pick off the first, knowing the ordering. There could be more than one. + List instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 1); - // Maybe we should blow up on '0' too? - if (instanceIds.size() > 1) { - throw new IllegalStateException("Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened. Blowing up."); + if (instanceIds.size() == 0) { + throw new IllegalStateException("Did not find an entry for job name = " + jobModel.getId()); } final List partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0)); @@ -271,7 +272,7 @@ public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(final FlowInSp throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException { final JSLJob jobModel = config.getJobModel(); - final long execId = getMostRecentExecutionId(jobModel); + final long execId = getMostRecentSubJobExecutionId(jobModel); final RuntimeFlowInSplitExecution jobExecution; try { jobExecution = JobExecutionHelper.restartFlowInSplit(servicesManager, execId, jobModel);