Skip to content

Commit

Permalink
[Improve][Client] Chunjun client support submit task to hadoop with k…
Browse files Browse the repository at this point in the history
…erberos(#1435)

(cherry picked from commit 0784968)
  • Loading branch information
kyo-tom authored and OT-XY committed Mar 3, 2023
1 parent 17de82a commit bff63f9
Showing 1 changed file with 67 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;

Expand All @@ -46,6 +48,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;

import static org.apache.flink.yarn.configuration.YarnConfigOptions.APPLICATION_ID;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.APPLICATION_QUEUE;
Expand All @@ -68,50 +71,71 @@ public ClusterClient submit(JobDeployer jobDeployer) throws Exception {
String yarnConfDir = launcherOptions.getHadoopConfDir();

try {
FileSystem.initialize(flinkConfig);

YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
try (YarnClient yarnClient = YarnClient.createYarnClient()) {
yarnClient.init(yarnConf);
yarnClient.start();
ApplicationId applicationId;

if (StringUtils.isEmpty(flinkConfig.get(APPLICATION_ID))) {
applicationId = getAppIdFromYarn(yarnClient, flinkConfig);
if (applicationId == null || StringUtils.isEmpty(applicationId.toString())) {
throw new RuntimeException("No flink session found on yarn cluster.");
}
} else {
applicationId = ConverterUtils.toApplicationId(flinkConfig.get(APPLICATION_ID));
}

HighAvailabilityMode highAvailabilityMode =
HighAvailabilityMode.fromConfig(flinkConfig);
if (highAvailabilityMode.equals(HighAvailabilityMode.ZOOKEEPER)
&& applicationId != null) {
flinkConfig.setString(
HighAvailabilityOptions.HA_CLUSTER_ID, applicationId.toString());
}
try (YarnClusterDescriptor yarnClusterDescriptor =
new YarnClusterDescriptor(
flinkConfig,
yarnConf,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
true)) {
ClusterClient clusterClient =
yarnClusterDescriptor.retrieve(applicationId).getClusterClient();
JobGraph jobGraph =
JobGraphUtil.buildJobGraph(
launcherOptions, programArgs.toArray(new String[0]));
jobGraph.getClasspaths().clear();
jobGraph.getUserJars().clear();
jobGraph.getUserArtifacts().clear();
JobID jobID = (JobID) clusterClient.submitJob(jobGraph).get();
LOG.info("submit job successfully, jobID = {}", jobID);
return clusterClient;
}
}

SecurityUtils.install(new SecurityConfiguration(flinkConfig));
return SecurityUtils.getInstalledContext()
.runSecured(
new Callable<ClusterClient>() {
@Override
public ClusterClient call() throws Exception {
FileSystem.initialize(flinkConfig);
try (YarnClient yarnClient = YarnClient.createYarnClient()) {
yarnClient.init(yarnConf);
yarnClient.start();
ApplicationId applicationId;

if (StringUtils.isEmpty(flinkConfig.get(APPLICATION_ID))) {
applicationId =
getAppIdFromYarn(yarnClient, flinkConfig);
if (applicationId == null
|| StringUtils.isEmpty(
applicationId.toString())) {
throw new RuntimeException(
"No flink session found on yarn cluster.");
}
} else {
applicationId =
ConverterUtils.toApplicationId(
flinkConfig.get(APPLICATION_ID));
}

HighAvailabilityMode highAvailabilityMode =
HighAvailabilityMode.fromConfig(flinkConfig);
if (highAvailabilityMode.equals(
HighAvailabilityMode.ZOOKEEPER)
&& applicationId != null) {
flinkConfig.setString(
HighAvailabilityOptions.HA_CLUSTER_ID,
applicationId.toString());
}
try (YarnClusterDescriptor yarnClusterDescriptor =
new YarnClusterDescriptor(
flinkConfig,
yarnConf,
yarnClient,
YarnClientYarnClusterInformationRetriever
.create(yarnClient),
true)) {
ClusterClient clusterClient =
yarnClusterDescriptor
.retrieve(applicationId)
.getClusterClient();
JobGraph jobGraph =
JobGraphUtil.buildJobGraph(
launcherOptions,
programArgs.toArray(new String[0]));
jobGraph.getClasspaths().clear();
jobGraph.getUserJars().clear();
jobGraph.getUserArtifacts().clear();
JobID jobID =
(JobID) clusterClient.submitJob(jobGraph).get();
LOG.info("submit job successfully, jobID = {}", jobID);
return clusterClient;
}
}
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit bff63f9

Please sign in to comment.