Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions docs/_includes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@
},
"parallelism" : {
"type" : "integer"
},
"jobId" : {
"type" : "any"
}
}
} </code>
Expand Down Expand Up @@ -411,6 +414,9 @@
"parallelism" : {
"type" : "integer"
},
"jobId" : {
"type" : "any"
},
"allowNonRestoredState" : {
"type" : "boolean"
},
Expand Down Expand Up @@ -2361,7 +2367,7 @@
"host" : {
"type" : "string"
},
"start_time" : {
"start-time" : {
"type" : "integer"
},
"end-time" : {
Expand Down Expand Up @@ -2399,6 +2405,9 @@
"type" : "boolean"
}
}
},
"start_time" : {
"type" : "integer"
}
}
}
Expand Down Expand Up @@ -3871,5 +3880,3 @@
</tr>
</tbody>
</table>

{% top %}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;

import javax.annotation.Nullable;

import java.net.URISyntaxException;
import java.net.URL;

Expand All @@ -54,7 +56,7 @@ public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism,
JobID jobID) throws ProgramInvocationException {
@Nullable JobID jobID) throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
final FlinkPlan flinkPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.RequestBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -34,15 +35,16 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class JarPlanRequestBody extends JarRequestBody {
JarPlanRequestBody() {
super(null, null, null, null);
super(null, null, null, null, null);
}

@JsonCreator
JarPlanRequestBody(
@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List<String> programArgumentsList,
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism) {
super(entryClassName, programArguments, programArgumentsList, parallelism);
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId) {
super(entryClassName, programArguments, programArgumentsList, parallelism, jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;

import javax.annotation.Nullable;

Expand All @@ -39,6 +44,7 @@ public abstract class JarRequestBody implements RequestBody {
static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
static final String FIELD_NAME_PROGRAM_ARGUMENTS_LIST = "programArgsList";
static final String FIELD_NAME_PARALLELISM = "parallelism";
static final String FIELD_NAME_JOB_ID = "jobId";

@JsonProperty(FIELD_NAME_ENTRY_CLASS)
@Nullable
Expand All @@ -56,20 +62,28 @@ public abstract class JarRequestBody implements RequestBody {
@Nullable
private Integer parallelism;

@JsonProperty(FIELD_NAME_JOB_ID)
@JsonDeserialize(using = JobIDDeserializer.class)
@JsonSerialize(using = JobIDSerializer.class)
@Nullable
private JobID jobId;

JarRequestBody() {
this(null, null, null, null);
this(null, null, null, null, null);
}

@JsonCreator
JarRequestBody(
@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List<String> programArgumentsList,
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism) {
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId) {
this.entryClassName = entryClassName;
this.programArguments = programArguments;
this.programArgumentsList = programArgumentsList;
this.parallelism = parallelism;
this.jobId = jobId;
}

@Nullable
Expand All @@ -95,4 +109,10 @@ public List<String> getProgramArgumentsList() {
public Integer getParallelism() {
return parallelism;
}

@Nullable
@JsonIgnore
public JobID getJobId() {
return jobId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.RequestBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -46,7 +47,7 @@ public class JarRunRequestBody extends JarRequestBody {
private String savepointPath;

public JarRunRequestBody() {
this(null, null, null, null, null, null);
this(null, null, null, null, null, null, null);
}

@JsonCreator
Expand All @@ -55,9 +56,10 @@ public JarRunRequestBody(
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List<String> programArgumentsList,
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
super(entryClassName, programArguments, programArgumentsList, parallelism);
super(entryClassName, programArguments, programArgumentsList, parallelism, jobId);
this.allowNonRestoredState = allowNonRestoredState;
this.savepointPath = savepointPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
Expand Down Expand Up @@ -69,12 +70,14 @@ public static class JarHandlerContext {
private final String entryClass;
private final List<String> programArgs;
private final int parallelism;
private final JobID jobId;

private JarHandlerContext(Path jarFile, String entryClass, List<String> programArgs, int parallelism) {
private JarHandlerContext(Path jarFile, String entryClass, List<String> programArgs, int parallelism, JobID jobId) {
this.jarFile = jarFile;
this.entryClass = entryClass;
this.programArgs = programArgs;
this.parallelism = parallelism;
this.jobId = jobId;
}

public static <R extends JarRequestBody> JarHandlerContext fromRequest(
Expand All @@ -100,7 +103,13 @@ public static <R extends JarRequestBody> JarHandlerContext fromRequest(
ExecutionConfig.PARALLELISM_DEFAULT,
log);

return new JarHandlerContext(jarFile, entryClass, programArgs, parallelism);
JobID jobId = fromRequestBodyOrQueryParameter(
requestBody.getJobId(),
() -> null, // No support via query parameter
null, // Delegate default job ID to actual JobGraph generation
log);

return new JarHandlerContext(jarFile, entryClass, programArgs, parallelism, jobId);
}

public JobGraph toJobGraph(Configuration configuration) {
Expand All @@ -114,7 +123,7 @@ public JobGraph toJobGraph(Configuration configuration) {
jarFile.toFile(),
entryClass,
programArgs.toArray(new String[0]));
return PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism);
return PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism, jobId);
} catch (final ProgramInvocationException e) {
throw new CompletionException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
Expand Down Expand Up @@ -49,13 +50,17 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.fail;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

/** Base test class for jar request handlers. */
public abstract class JarHandlerParameterTest
Expand Down Expand Up @@ -184,6 +189,25 @@ public void testConfigurationViaJsonRequestFailWithProgArgsAsStringAndList() thr
}
}

@Test
public void testProvideJobId() throws Exception {
JobID jobId = new JobID();

HandlerRequest<REQB, M> request = createRequest(
getJarRequestBodyWithJobId(jobId),
getUnresolvedJarMessageParameters(),
getUnresolvedJarMessageParameters(),
jarWithManifest
);

handleRequest(request);

Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();

assertThat(jobGraph.isPresent(), is(true));
assertThat(jobGraph.get().getJobID(), is(equalTo(jobId)));
}

private void testConfigurationViaJsonRequest(ProgramArgsParType programArgsParType) throws Exception {
handleRequest(createRequest(
getJarRequestBody(programArgsParType),
Expand Down Expand Up @@ -270,6 +294,8 @@ private static <X> List<String> getValuesAsString(MessageQueryParameter<X> param

abstract REQB getJarRequestBody(ProgramArgsParType programArgsParType);

abstract REQB getJarRequestBodyWithJobId(JobID jobId);

abstract void handleRequest(HandlerRequest<REQB, M> request) throws Exception;

JobGraph validateDefaultGraph() {
Expand All @@ -286,6 +312,10 @@ JobGraph validateGraph() {
return jobGraph;
}

private static Optional<JobGraph> getLastSubmittedJobGraphAndReset() {
return Optional.ofNullable(LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null));
}

private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
ExecutionConfig executionConfig;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
Expand Down Expand Up @@ -103,7 +104,13 @@ JarPlanRequestBody getJarRequestBody(ProgramArgsParType programArgsParType) {
ParameterProgram.class.getCanonicalName(),
getProgramArgsString(programArgsParType),
getProgramArgsList(programArgsParType),
PARALLELISM);
PARALLELISM,
null);
}

@Override
JarPlanRequestBody getJarRequestBodyWithJobId(JobID jobId) {
return new JarPlanRequestBody(null, null, null, null, jobId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -121,11 +122,17 @@ JarRunRequestBody getJarRequestBody(ProgramArgsParType programArgsParType) {
getProgramArgsString(programArgsParType),
getProgramArgsList(programArgsParType),
PARALLELISM,
null,
ALLOW_NON_RESTORED_STATE_QUERY,
RESTORE_PATH
);
}

@Override
JarRunRequestBody getJarRequestBodyWithJobId(JobID jobId) {
return new JarRunRequestBody(null, null, null, null, jobId, null, null);
}

@Override
void handleRequest(HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;

import java.util.Arrays;
Expand All @@ -41,6 +42,7 @@ protected JarRunRequestBody getTestRequestInstance() {
"world",
Arrays.asList("boo", "far"),
4,
new JobID(),
true,
"foo/bar"
);
Expand All @@ -54,6 +56,7 @@ protected void assertOriginalEqualsToUnmarshalled(
assertEquals(expected.getProgramArguments(), actual.getProgramArguments());
assertEquals(expected.getProgramArgumentsList(), actual.getProgramArgumentsList());
assertEquals(expected.getParallelism(), actual.getParallelism());
assertEquals(expected.getJobId(), actual.getJobId());
assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState());
assertEquals(expected.getSavepointPath(), actual.getSavepointPath());
}
Expand Down