Skip to content

Commit 3c7129c

Browse files
committed
[FLINK-21700][security] Add an option to disable credential retrieval on a secure cluster
1 parent 5954122 commit 3c7129c

File tree

5 files changed

+67
-13
lines changed

5 files changed

+67
-13
lines changed

docs/layouts/shortcodes/generated/security_auth_kerberos_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88
</tr>
99
</thead>
1010
<tbody>
11+
<tr>
12+
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
13+
<td style="word-wrap: break-word;">true</td>
14+
<td>Boolean</td>
15+
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.</td>
16+
</tr>
1117
<tr>
1218
<td><h5>security.kerberos.login.contexts</h5></td>
1319
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/security_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>List&lt;String&gt;</td>
1515
<td>List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
19+
<td style="word-wrap: break-word;">true</td>
20+
<td>Boolean</td>
21+
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>security.kerberos.krb5-conf.path</h5></td>
1925
<td style="word-wrap: break-word;">(none)</td>

flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,20 @@ public class SecurityOptions {
108108
+ " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for"
109109
+ " Kafka authentication)");
110110

111+
@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
112+
public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
113+
key("security.kerberos.fetch.delegation-token")
114+
.booleanType()
115+
.defaultValue(true)
116+
.withDescription(
117+
"Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. "
118+
+ "Only HDFS and HBase are supported. It is used in Yarn deployments. "
119+
+ "If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. "
120+
+ "If false, Flink will assume that the delegation tokens are managed outside of Flink. "
121+
+ "As a consequence, it will not fetch delegation tokens for HDFS and HBase. "
122+
+ "You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, "
123+
+ "to handle delegation tokens.");
124+
111125
// ------------------------------------------------------------------------
112126
// ZooKeeper Security Options
113127
// ------------------------------------------------------------------------

flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,23 @@ private static LocalResource registerLocalResource(
197197
}
198198

199199
public static void setTokensFor(
200-
ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
200+
ContainerLaunchContext amContainer,
201+
List<Path> paths,
202+
Configuration conf,
203+
boolean obtainingDelegationTokens)
201204
throws IOException {
202205
Credentials credentials = new Credentials();
203-
// for HDFS
204-
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
205-
// for HBase
206-
obtainTokenForHBase(credentials, conf);
206+
207+
if (obtainingDelegationTokens) {
208+
LOG.info("Obtaining delegation tokens for HDFS and HBase.");
209+
// for HDFS
210+
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
211+
// for HBase
212+
obtainTokenForHBase(credentials, conf);
213+
} else {
214+
LOG.info("Delegation token retrieval for HDFS and HBase is disabled.");
215+
}
216+
207217
// for user
208218
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
209219

flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
5151
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
5252
import org.apache.flink.runtime.util.HadoopUtils;
53+
import org.apache.flink.util.CollectionUtil;
5354
import org.apache.flink.util.FlinkException;
5455
import org.apache.flink.util.Preconditions;
5556
import org.apache.flink.util.ShutdownHookUtil;
@@ -62,7 +63,6 @@
6263
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
6364
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
6465

65-
import org.apache.commons.collections.ListUtils;
6666
import org.apache.hadoop.fs.FileSystem;
6767
import org.apache.hadoop.fs.Path;
6868
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -105,6 +105,7 @@
105105
import java.net.URI;
106106
import java.net.URLDecoder;
107107
import java.nio.charset.Charset;
108+
import java.util.ArrayList;
108109
import java.util.Collection;
109110
import java.util.Collections;
110111
import java.util.HashMap;
@@ -529,6 +530,19 @@ private ClusterClientProvider<ApplicationId> deployInternal(
529530
"Hadoop security with Kerberos is enabled but the login user "
530531
+ "does not have Kerberos credentials or delegation tokens!");
531532
}
533+
534+
final boolean fetchToken =
535+
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
536+
final boolean yarnAccessFSEnabled =
537+
!CollectionUtil.isNullOrEmpty(
538+
flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS));
539+
if (!fetchToken && yarnAccessFSEnabled) {
540+
throw new IllegalConfigurationException(
541+
String.format(
542+
"When %s is disabled, %s must be disabled as well.",
543+
SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
544+
YarnConfigOptions.YARN_ACCESS.key()));
545+
}
532546
}
533547

534548
isReadyForDeployment(clusterSpecification);
@@ -1081,13 +1095,17 @@ private ApplicationReport startAppMaster(
10811095
if (UserGroupInformation.isSecurityEnabled()) {
10821096
// set HDFS delegation tokens when security is enabled
10831097
LOG.info("Adding delegation token to the AM container.");
1084-
List<Path> yarnAccessList =
1085-
ConfigUtils.decodeListFromConfig(
1086-
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1087-
Utils.setTokensFor(
1088-
amContainer,
1089-
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
1090-
yarnConfiguration);
1098+
final List<Path> pathsToObtainToken = new ArrayList<>();
1099+
boolean fetchToken =
1100+
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
1101+
if (fetchToken) {
1102+
List<Path> yarnAccessList =
1103+
ConfigUtils.decodeListFromConfig(
1104+
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
1105+
pathsToObtainToken.addAll(yarnAccessList);
1106+
pathsToObtainToken.addAll(fileUploader.getRemotePaths());
1107+
}
1108+
Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
10911109
}
10921110

10931111
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());

0 commit comments

Comments
 (0)