Skip to content

[FLINK-37711] Remove unused getJobGraph from PipelineExecutorUtils #26493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -21,15 +21,13 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
@@ -39,7 +37,6 @@

import javax.annotation.Nonnull;

import java.net.MalformedURLException;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -49,50 +46,6 @@
public class PipelineExecutorUtils {
private static final Logger LOG = LoggerFactory.getLogger(PipelineExecutorUtils.class);

/**
* Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
*
* @param pipeline the pipeline whose job graph we are computing.
* @param configuration the configuration with the necessary information such as jars and
* classpaths to be included, the parallelism of the job and potential savepoint settings
* used to bootstrap its state.
* @param userClassloader the classloader which can load user classes.
* @return the corresponding {@link JobGraph}.
*/
public static JobGraph getJobGraph(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull ClassLoader userClassloader)
throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);

final ExecutionConfigAccessor executionConfigAccessor =
ExecutionConfigAccessor.fromConfiguration(configuration);
final JobGraph jobGraph =
FlinkPipelineTranslationUtil.getJobGraph(
userClassloader,
pipeline,
configuration,
executionConfigAccessor.getParallelism());

configuration
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
.ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));

if (configuration.get(DeploymentOptions.ATTACHED)
&& configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
jobGraph.setInitialClientHeartbeatTimeout(
configuration.get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT).toMillis());
}

jobGraph.addJars(executionConfigAccessor.getJars());
jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());

return jobGraph;
}

/**
* Notify the {@link DefaultJobCreatedEvent} to job status changed listeners.
*
Original file line number Diff line number Diff line change
@@ -28,8 +28,8 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
@@ -191,7 +191,7 @@ void testCreateWithUserLibDir() throws FlinkException {
}

@Test
void testJobGraphRetrieval() throws IOException, FlinkException, ProgramInvocationException {
void testJobGraphRetrieval() throws Exception {
final int parallelism = 42;
final JobID jobId = new JobID();

@@ -207,22 +207,21 @@ void testJobGraphRetrieval() throws IOException, FlinkException, ProgramInvocati
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
new Configuration());

final JobGraph jobGraph = retrieveJobGraph(retriever, configuration);
final StreamGraph streamGraph = retrieveStreamGraph(retriever, configuration);

assertThat(jobGraph.getName())
assertThat(streamGraph.getName())
.isEqualTo(
testJobEntryClassClasspathProvider.getJobClassName()
+ "-"
+ expectedSuffix);
assertThat(jobGraph.getSavepointRestoreSettings())
assertThat(streamGraph.getSavepointRestoreSettings())
.isEqualTo(SavepointRestoreSettings.none());
assertThat(jobGraph.getMaximumParallelism()).isEqualTo(parallelism);
assertThat(jobGraph.getJobID()).isEqualTo(jobId);
assertThat(streamGraph.getMaximumParallelism()).isEqualTo(parallelism);
assertThat(streamGraph.getJobID()).isEqualTo(jobId);
}

@Test
void testJobGraphRetrievalFromJar()
throws IOException, FlinkException, ProgramInvocationException {
void testJobGraphRetrievalFromJar() throws Exception {
final String expectedSuffix = "suffix";
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
@@ -232,18 +231,18 @@ void testJobGraphRetrievalFromJar()
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
new Configuration());

final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

assertThat(jobGraph.getName())
assertThat(streamGraph.getName())
.isEqualTo(
testJobEntryClassClasspathProvider.getJobClassName()
+ "-"
+ expectedSuffix);
}

@Test
void testParameterConsiderationForMultipleJobsOnSystemClasspath()
throws IOException, FlinkException, ProgramInvocationException {
void testParameterConsiderationForMultipleJobsOnSystemClasspath() throws Exception {
final String expectedSuffix = "suffix";
final PackagedProgramRetriever retrieverUnderTest =
// Both a class name is specified and a JAR "is" on the class path
@@ -254,15 +253,15 @@ void testParameterConsiderationForMultipleJobsOnSystemClasspath()
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
new Configuration());

final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

assertThat(jobGraph.getName())
assertThat(streamGraph.getName())
.isEqualTo(testJobEntryClassClasspathProvider.getJobClassName() + "-suffix");
}

@Test
void testSavepointRestoreSettings()
throws FlinkException, IOException, ProgramInvocationException {
void testSavepointRestoreSettings() throws Exception {
final Configuration configuration = new Configuration();
final SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath("foobar", true);
@@ -279,10 +278,10 @@ void testSavepointRestoreSettings()
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
new Configuration());

final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, configuration);
final StreamGraph streamGraph = retrieveStreamGraph(retrieverUnderTest, configuration);

assertThat(jobGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings);
assertThat(jobGraph.getJobID()).isEqualTo(jobId);
assertThat(streamGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings);
assertThat(streamGraph.getJobID()).isEqualTo(jobId);
}

@Test
@@ -394,17 +393,19 @@ void testWithJobClassAndMultipleEntryClassesOnSystemClasspath()
}

@Test
void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory(),
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());
final List<String> actualClasspath =
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
streamGraph.getClasspaths().stream()
.map(URL::toString)
.collect(Collectors.toList());

final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
@@ -414,17 +415,19 @@ void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass()
}

@Test
void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory(),
singleEntryClassClasspathProvider.getJobClassName(),
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());
final List<String> actualClasspath =
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
streamGraph.getClasspaths().stream()
.map(URL::toString)
.collect(Collectors.toList());

final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
@@ -450,32 +453,32 @@ void testRetrieveCorrectUserClasspathsWithPipelineClasspaths() throws Exception
singleEntryClassClasspathProvider.getJobClassName(),
ClasspathProviderExtension.parametersForTestJob("suffix"),
configuration);
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
assertThat(jobGraph.getClasspaths()).isEqualTo(expectedMergedURLs);
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());
assertThat(streamGraph.getClasspaths()).isEqualTo(expectedMergedURLs);
}

@Test
void testRetrieveFromJarFileWithoutUserLib()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveFromJarFileWithoutUserLib() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
null,
testJobEntryClassClasspathProvider.getJobJar(),
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

assertThat(jobGraph.getUserJars())
assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
assertThat(jobGraph.getClasspaths()).isEmpty();
assertThat(streamGraph.getClasspaths()).isEmpty();
}

@Test
void testRetrieveFromJarFileWithUserLib()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveFromJarFileWithUserLib() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory(),
@@ -484,14 +487,17 @@ void testRetrieveFromJarFileWithUserLib()
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

assertThat(jobGraph.getUserJars())
assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
final List<String> actualClasspath =
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
streamGraph.getClasspaths().stream()
.map(URL::toString)
.collect(Collectors.toList());
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
singleEntryClassClasspathProvider.getDirectory());
@@ -500,8 +506,7 @@ void testRetrieveFromJarFileWithUserLib()
}

@Test
void testRetrieveFromJarFileWithNonRootUserLib()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveFromJarFileWithNonRootUserLib() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory().getParentFile(),
@@ -511,14 +516,17 @@ void testRetrieveFromJarFileWithNonRootUserLib()
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

assertThat(jobGraph.getUserJars())
assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
final List<String> actualClasspath =
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
streamGraph.getClasspaths().stream()
.map(URL::toString)
.collect(Collectors.toList());
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
singleEntryClassClasspathProvider.getDirectory());
@@ -527,8 +535,7 @@ void testRetrieveFromJarFileWithNonRootUserLib()
}

@Test
void testRetrieveFromJarFileWithSymlinkUserLib()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveFromJarFileWithSymlinkUserLib() throws Exception {
final File actualUsrLib = new File(symlinkClasspathProvider.getDirectory(), "usrlib");
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
@@ -539,14 +546,17 @@ void testRetrieveFromJarFileWithSymlinkUserLib()
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

assertThat(jobGraph.getUserJars())
assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
final List<String> actualClasspath =
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
streamGraph.getClasspaths().stream()
.map(URL::toString)
.collect(Collectors.toList());
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(actualUsrLib);

@@ -555,8 +565,7 @@ void testRetrieveFromJarFileWithSymlinkUserLib()
}

@Test
void testRetrieveFromJarFileWithArtifacts()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveFromJarFileWithArtifacts() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
null,
@@ -567,14 +576,17 @@ void testRetrieveFromJarFileWithArtifacts()
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

assertThat(jobGraph.getUserJars())
assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
final List<String> actualClasspath =
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
streamGraph.getClasspaths().stream()
.map(URL::toString)
.collect(Collectors.toList());
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
additionalArtifactClasspathProvider.getDirectory());
@@ -583,8 +595,7 @@ void testRetrieveFromJarFileWithArtifacts()
}

@Test
void testRetrieveFromJarFileWithUserAndArtifactLib()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveFromJarFileWithUserAndArtifactLib() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory(),
@@ -595,14 +606,17 @@ void testRetrieveFromJarFileWithUserAndArtifactLib()
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

assertThat(jobGraph.getUserJars())
assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
final List<String> actualClasspath =
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
streamGraph.getClasspaths().stream()
.map(URL::toString)
.collect(Collectors.toList());
final List<String> expectedClasspath = new ArrayList<>();
expectedClasspath.addAll(
extractRelativizedURLsForJarsFromDirectory(
@@ -615,8 +629,7 @@ void testRetrieveFromJarFileWithUserAndArtifactLib()
}

@Test
void testRetrieveFromArtifactLibWithoutJarFile()
throws IOException, FlinkException, ProgramInvocationException {
void testRetrieveFromArtifactLibWithoutJarFile() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
null,
@@ -626,10 +639,13 @@ void testRetrieveFromArtifactLibWithoutJarFile()
multipleEntryClassesClasspathProvider.getJobClassName(),
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, new Configuration());

final List<String> actualClasspath =
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
streamGraph.getClasspaths().stream()
.map(URL::toString)
.collect(Collectors.toList());
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
multipleEntryClassesClasspathProvider.getDirectory());
@@ -682,9 +698,9 @@ void testConfigurationIsConsidered() throws FlinkException {
.isInstanceOf(FlinkUserCodeClassLoaders.ParentFirstClassLoader.class);
}

private JobGraph retrieveJobGraph(
private StreamGraph retrieveStreamGraph(
PackagedProgramRetriever retrieverUnderTest, Configuration configuration)
throws FlinkException, ProgramInvocationException, MalformedURLException {
throws Exception {
final PackagedProgram packagedProgram = retrieverUnderTest.getPackagedProgram();

final int defaultParallelism = configuration.get(CoreOptions.DEFAULT_PARALLELISM);
@@ -702,8 +718,8 @@ private JobGraph retrieveJobGraph(
final Pipeline pipeline =
PackagedProgramUtils.getPipelineFromProgram(
packagedProgram, configuration, defaultParallelism, false);
return PipelineExecutorUtils.getJobGraph(
pipeline, configuration, packagedProgram.getUserCodeClassLoader());

return PipelineExecutorUtils.getStreamGraph(pipeline, configuration);
}

private static List<String> extractRelativizedURLsForJarsFromDirectory(File directory)