diff --git a/docs/_includes/generated/rest_v1_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html index 2fd3dcfe40c9a..36ffe51b60648 100644 --- a/docs/_includes/generated/rest_v1_dispatcher.html +++ b/docs/_includes/generated/rest_v1_dispatcher.html @@ -321,6 +321,9 @@ }, "parallelism" : { "type" : "integer" + }, + "jobId" : { + "type" : "any" } } } @@ -411,6 +414,9 @@ "parallelism" : { "type" : "integer" }, + "jobId" : { + "type" : "any" + }, "allowNonRestoredState" : { "type" : "boolean" }, @@ -2361,7 +2367,7 @@ "host" : { "type" : "string" }, - "start_time" : { + "start-time" : { "type" : "integer" }, "end-time" : { @@ -2399,6 +2405,9 @@ "type" : "boolean" } } + }, + "start_time" : { + "type" : "integer" } } } @@ -3871,5 +3880,3 @@ - -{% top %} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 59ab40658048d..7572d752d82f6 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -31,6 +31,8 @@ import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; +import javax.annotation.Nullable; + import java.net.URISyntaxException; import java.net.URL; @@ -54,7 +56,7 @@ public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, int defaultParallelism, - JobID jobID) throws ProgramInvocationException { + @Nullable JobID jobID) throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration); final FlinkPlan flinkPlan; diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java index 8e209b41cc1bb..c30edb3f0e7c5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -34,7 +35,7 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public class JarPlanRequestBody extends JarRequestBody { JarPlanRequestBody() { - super(null, null, null, null); + super(null, null, null, null, null); } @JsonCreator @@ -42,7 +43,8 @@ public class JarPlanRequestBody extends JarRequestBody { @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName, @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments, @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List programArgumentsList, - @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism) { - super(entryClassName, programArguments, programArgumentsList, parallelism); + @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism, + @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId) { + super(entryClassName, programArguments, programArgumentsList, parallelism, jobId); } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java index 144ac05058c3e..0d6dabb0ab01f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java @@ -18,12 +18,17 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; import javax.annotation.Nullable; @@ -39,6 +44,7 @@ public abstract class JarRequestBody implements RequestBody { static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs"; static final String FIELD_NAME_PROGRAM_ARGUMENTS_LIST = "programArgsList"; static final String FIELD_NAME_PARALLELISM = "parallelism"; + static final String FIELD_NAME_JOB_ID = "jobId"; @JsonProperty(FIELD_NAME_ENTRY_CLASS) @Nullable @@ -56,8 +62,14 @@ public abstract class JarRequestBody implements RequestBody { @Nullable private Integer parallelism; + @JsonProperty(FIELD_NAME_JOB_ID) + @JsonDeserialize(using = JobIDDeserializer.class) + @JsonSerialize(using = JobIDSerializer.class) + @Nullable + private JobID jobId; + JarRequestBody() { - this(null, null, null, null); + this(null, null, null, null, null); } @JsonCreator @@ -65,11 +77,13 @@ public abstract class JarRequestBody implements RequestBody { @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName, @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments, @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List programArgumentsList, - @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism) { + @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism, + @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId) { this.entryClassName = entryClassName; this.programArguments = programArguments; this.programArgumentsList = programArgumentsList; this.parallelism = parallelism; + this.jobId = jobId; } @Nullable @@ -95,4 +109,10 @@ public List getProgramArgumentsList() { public Integer getParallelism() { return parallelism; } + + @Nullable + @JsonIgnore + public JobID getJobId() { + return jobId; + } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java index 9e4ee0f8c2ad2..913e223cb8512 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -46,7 +47,7 @@ public class JarRunRequestBody extends JarRequestBody { private String savepointPath; public JarRunRequestBody() { - this(null, null, null, null, null, null); + this(null, null, null, null, null, null, null); } @JsonCreator @@ -55,9 +56,10 @@ public JarRunRequestBody( @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments, @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List programArgumentsList, @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism, + @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId, @Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState, @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) { - super(entryClassName, programArguments, programArgumentsList, parallelism); + super(entryClassName, programArguments, programArgumentsList, parallelism, jobId); this.allowNonRestoredState = allowNonRestoredState; this.savepointPath = savepointPath; } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java index 9026cf00c5efa..e90390dcf321b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; @@ -69,12 +70,14 @@ public static class JarHandlerContext { private final String entryClass; private final List programArgs; private final int parallelism; + private final JobID jobId; - private JarHandlerContext(Path jarFile, String entryClass, List programArgs, int parallelism) { + private JarHandlerContext(Path jarFile, String entryClass, List programArgs, int parallelism, JobID jobId) { this.jarFile = jarFile; this.entryClass = entryClass; this.programArgs = programArgs; this.parallelism = parallelism; + this.jobId = jobId; } public static JarHandlerContext fromRequest( @@ -100,7 +103,13 @@ public static JarHandlerContext fromRequest( ExecutionConfig.PARALLELISM_DEFAULT, log); - return new JarHandlerContext(jarFile, entryClass, programArgs, parallelism); + JobID jobId = fromRequestBodyOrQueryParameter( + requestBody.getJobId(), + () -> null, // No support via query parameter + null, // Delegate default job ID to actual JobGraph generation + log); + + return new JarHandlerContext(jarFile, entryClass, programArgs, parallelism, jobId); } public JobGraph toJobGraph(Configuration configuration) { @@ -114,7 +123,7 @@ public JobGraph toJobGraph(Configuration configuration) { jarFile.toFile(), entryClass, programArgs.toArray(new String[0])); - return PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism); + return PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism, jobId); } catch (final ProgramInvocationException e) { throw new CompletionException(e); } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java index 5c0ad507b4bb3..6939ba30c6359 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; @@ -49,6 +50,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +58,9 @@ import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.fail; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; /** Base test class for jar request handlers. */ public abstract class JarHandlerParameterTest @@ -184,6 +189,25 @@ public void testConfigurationViaJsonRequestFailWithProgArgsAsStringAndList() thr } } + @Test + public void testProvideJobId() throws Exception { + JobID jobId = new JobID(); + + HandlerRequest request = createRequest( + getJarRequestBodyWithJobId(jobId), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithManifest + ); + + handleRequest(request); + + Optional jobGraph = getLastSubmittedJobGraphAndReset(); + + assertThat(jobGraph.isPresent(), is(true)); + assertThat(jobGraph.get().getJobID(), is(equalTo(jobId))); + } + private void testConfigurationViaJsonRequest(ProgramArgsParType programArgsParType) throws Exception { handleRequest(createRequest( getJarRequestBody(programArgsParType), @@ -270,6 +294,8 @@ private static List getValuesAsString(MessageQueryParameter param abstract REQB getJarRequestBody(ProgramArgsParType programArgsParType); + abstract REQB getJarRequestBodyWithJobId(JobID jobId); + abstract void handleRequest(HandlerRequest request) throws Exception; JobGraph validateDefaultGraph() { @@ -286,6 +312,10 @@ JobGraph validateGraph() { return jobGraph; } + private static Optional getLastSubmittedJobGraphAndReset() { + return Optional.ofNullable(LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null)); + } + private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) { ExecutionConfig executionConfig; try { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java index 1cf3af0f4769b..11944d42f2a9a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.rest.handler.HandlerRequest; @@ -103,7 +104,13 @@ JarPlanRequestBody getJarRequestBody(ProgramArgsParType programArgsParType) { ParameterProgram.class.getCanonicalName(), getProgramArgsString(programArgsParType), getProgramArgsList(programArgsParType), - PARALLELISM); + PARALLELISM, + null); + } + + @Override + JarPlanRequestBody getJarRequestBodyWithJobId(JobID jobId) { + return new JarPlanRequestBody(null, null, null, null, jobId); } @Override diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java index e506b92e36ead..796c08b0b0eaf 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -121,11 +122,17 @@ JarRunRequestBody getJarRequestBody(ProgramArgsParType programArgsParType) { getProgramArgsString(programArgsParType), getProgramArgsList(programArgsParType), PARALLELISM, + null, ALLOW_NON_RESTORED_STATE_QUERY, RESTORE_PATH ); } + @Override + JarRunRequestBody getJarRequestBodyWithJobId(JobID jobId) { + return new JarRunRequestBody(null, null, null, null, jobId, null, null); + } + @Override void handleRequest(HandlerRequest request) throws Exception { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java index 18fcd97d340ee..3dc65c9480bd6 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase; import java.util.Arrays; @@ -41,6 +42,7 @@ protected JarRunRequestBody getTestRequestInstance() { "world", Arrays.asList("boo", "far"), 4, + new JobID(), true, "foo/bar" ); @@ -54,6 +56,7 @@ protected void assertOriginalEqualsToUnmarshalled( assertEquals(expected.getProgramArguments(), actual.getProgramArguments()); assertEquals(expected.getProgramArgumentsList(), actual.getProgramArgumentsList()); assertEquals(expected.getParallelism(), actual.getParallelism()); + assertEquals(expected.getJobId(), actual.getJobId()); assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState()); assertEquals(expected.getSavepointPath(), actual.getSavepointPath()); }