From 3271de8064891f100d7a0c66bbece96c373cbac6 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Wed, 21 Apr 2021 18:46:46 +0800 Subject: [PATCH] [FLINK-22085][tests] Update TestUtils::tryExecute() to cancel the job after execution failure. --- .../org/apache/flink/test/util/TestUtils.java | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java index daf66d4bb3d57..60c6a180cd8d7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java @@ -18,41 +18,50 @@ package org.apache.flink.test.util; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.client.JobInitializationException; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; import static org.junit.Assert.fail; /** Test utilities. */ public class TestUtils { - public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) - throws Exception { + // Execute the job and wait for the job result synchronously. The method throws exception + // iff one of the following conditions happens: + // 1) The job finishes successfully without exception + // 2) The job finishes with an exception that contains SuccessException. + public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception { + JobClient jobClient = null; try { - return see.execute(name); - } catch (ProgramInvocationException | JobExecutionException root) { - Throwable cause = root.getCause(); - - // search for nested SuccessExceptions - int depth = 0; - while (!(cause instanceof SuccessException)) { - if (cause == null || depth++ == 20) { - root.printStackTrace(); - fail("Test failed: " + root.getMessage()); - } else { - cause = cause.getCause(); + StreamGraph graph = see.getStreamGraph(name); + jobClient = see.executeAsync(graph); + jobClient.getJobExecutionResult().get(); + } catch (Throwable root) { + if (jobClient != null) { + try { + jobClient.cancel().get(); + } catch (Exception e) { + // Exception could be thrown if the job has already finished. + // Ignore the exception. } } - } - return null; + Throwable t = root; + while (t != null && !(t instanceof SuccessException)) { + t = t.getCause(); + } + + if (t == null) { + root.printStackTrace(); + fail("Test failed: " + root.getMessage()); + } + } } public static void submitJobAndWaitForResult(