diff --git a/README.md b/README.md
index 85ddad650..442d95c14 100644
--- a/README.md
+++ b/README.md
@@ -16,6 +16,14 @@ At present, ***BitSail*** is mainly designed with the ELT model, which have EB d
- In Streaming mode, auto-detect the exists checkpoint and apply when job restart.
- ...
+## Requirements
+
+The latest version of bitsail has the following minimal requirements:
+
+- Java 8 and higher for the build is required. For usage Java 8 is a minimum requirement;
+- Maven 3.6 and higher;
+- Operating system: no specific requirements (tested on Windows and Linux).
+
## Support Connectors
diff --git a/bitsail-base/src/main/java/com/bytedance/bitsail/base/extension/SecurityModule.java b/bitsail-base/src/main/java/com/bytedance/bitsail/base/extension/SecurityModule.java
new file mode 100644
index 000000000..a7ee59caa
--- /dev/null
+++ b/bitsail-base/src/main/java/com/bytedance/bitsail/base/extension/SecurityModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.bytedance.bitsail.base.extension;
+
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface SecurityModule extends Serializable {
+
+ void initializeModule(BitSailConfiguration securityConfiguration);
+
+ void login() throws IOException;
+
+ void logout();
+}
diff --git a/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/command/BaseCommandArgs.java b/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/command/BaseCommandArgs.java
index 93681d67a..abf280f26 100644
--- a/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/command/BaseCommandArgs.java
+++ b/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/command/BaseCommandArgs.java
@@ -46,4 +46,18 @@ public class BaseCommandArgs implements CommandArgs {
@Parameter(names = {"--conf"})
private String jobConf;
+
+ @Parameter(names = {"--enable-kerberos"})
+ private boolean enableKerberos = false;
+
+ @Parameter(names = {"--keytab-path"})
+ private String keytabPath;
+
+ @Parameter(names = {"--principal"})
+ private String principal;
+
+ @Parameter(names = {"--krb5-conf-path"})
+ private String krb5ConfPath;
+
+ private String[] unknownOptions;
}
diff --git a/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/engine/EngineRunner.java b/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/engine/EngineRunner.java
index b0a987520..f0abe8f6c 100644
--- a/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/engine/EngineRunner.java
+++ b/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/engine/EngineRunner.java
@@ -20,19 +20,22 @@
import com.bytedance.bitsail.client.api.command.BaseCommandArgs;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import java.io.IOException;
import java.io.Serializable;
+import java.net.URLClassLoader;
/**
* Created 2022/8/5
*/
public interface EngineRunner extends Serializable {
- default void addEngineClasspath() {
+ default void loadLibrary(URLClassLoader classLoader) {
return;
}
+ void initializeEngine(BitSailConfiguration sysConfiguration);
+
ProcessBuilder getProcBuilder(BitSailConfiguration jobConfiguration,
- BaseCommandArgs baseCommandArgs,
- String[] args);
+ BaseCommandArgs baseCommandArgs) throws IOException;
String engineName();
}
diff --git a/bitsail-clients/bitsail-client-api/src/test/java/com/bytedance/bitsail/client/api/command/KerberosCommandArgsParserTest.java b/bitsail-clients/bitsail-client-api/src/test/java/com/bytedance/bitsail/client/api/command/KerberosCommandArgsParserTest.java
new file mode 100644
index 000000000..5da5e295e
--- /dev/null
+++ b/bitsail-clients/bitsail-client-api/src/test/java/com/bytedance/bitsail/client/api/command/KerberosCommandArgsParserTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.client.api.command;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KerberosCommandArgsParserTest {
+
+ @Test
+ public void testParseKerberosArgs() {
+ String[] args = new String[]{
+ "--keytab-path", "/etc/kerberos/keytab",
+ "--principal", "test_user",
+ "--krb5-conf-path", "/etc/kerberos/krb5.conf",
+ "--unknownkey", "unknown_value"
+ };
+
+ BaseCommandArgs kerberosCommandArgs = new BaseCommandArgs();
+ CommandArgsParser.parseArguments(args, kerberosCommandArgs);
+
+ Assert.assertEquals("/etc/kerberos/keytab", kerberosCommandArgs.getKeytabPath());
+ Assert.assertEquals("test_user", kerberosCommandArgs.getPrincipal());
+ Assert.assertEquals("/etc/kerberos/krb5.conf", kerberosCommandArgs.getKrb5ConfPath());
+ Assert.assertFalse(kerberosCommandArgs.isEnableKerberos());
+ }
+
+ @Test
+ public void testParseMoreArgs() {
+ String[] args = new String[]{
+ "--enable-kerberos",
+ "--keytab-path", "/root/dts_test/test.keytab",
+ "--principal", "admin/admin@HADOOP.COM",
+ "--krb5-conf-path", "/etc/krb5.conf"
+ };
+ BaseCommandArgs kerberosCommandArgs = new BaseCommandArgs();
+ CommandArgsParser.parseArguments(args, kerberosCommandArgs);
+ Assert.assertTrue(kerberosCommandArgs.isEnableKerberos());
+ }
+}
diff --git a/bitsail-clients/bitsail-client-entry-flink/pom.xml b/bitsail-clients/bitsail-client-entry-flink/pom.xml
index eebb8dfd3..43c02e596 100644
--- a/bitsail-clients/bitsail-client-entry-flink/pom.xml
+++ b/bitsail-clients/bitsail-client-entry-flink/pom.xml
@@ -32,11 +32,24 @@
${revision}
+
+ com.bytedance.bitsail
+ bitsail-component-security-kerberos
+ ${revision}
+ provided
+
+
org.apache.hadoop
hadoop-client
${hadoop.version}
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
\ No newline at end of file
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/command/FlinkRunCommandArgs.java b/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/command/FlinkRunCommandArgs.java
index 5d1c6e241..e56a655fe 100644
--- a/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/command/FlinkRunCommandArgs.java
+++ b/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/command/FlinkRunCommandArgs.java
@@ -53,7 +53,7 @@ public class FlinkRunCommandArgs implements CommandArgs {
@SuppressWarnings("checkstyle:MagicNumber")
@Parameter(names = "--priority",
- description = "Specify the job's priority in resource manaegr, eg: yarn.")
+ description = "Specify the job's priority in resource manager, eg: yarn.")
private int priority = 5;
}
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/engine/FlinkEngineRunner.java b/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/engine/FlinkEngineRunner.java
index 3bea6813b..3fbbf1925 100644
--- a/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/engine/FlinkEngineRunner.java
+++ b/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/engine/FlinkEngineRunner.java
@@ -23,24 +23,32 @@
import com.bytedance.bitsail.client.api.engine.EngineRunner;
import com.bytedance.bitsail.client.api.utils.PackageResolver;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
-import com.bytedance.bitsail.common.configuration.BitSailSystemConfiguration;
import com.bytedance.bitsail.common.exception.CommonErrorCode;
import com.bytedance.bitsail.entry.flink.command.FlinkRunCommandArgs;
import com.bytedance.bitsail.entry.flink.configuration.FlinkRunnerConfigOptions;
import com.bytedance.bitsail.entry.flink.deployment.DeploymentSupplier;
import com.bytedance.bitsail.entry.flink.deployment.DeploymentSupplierFactory;
import com.bytedance.bitsail.entry.flink.savepoint.FlinkRunnerSavepointLoader;
+import com.bytedance.bitsail.entry.flink.security.FlinkSecurityHandler;
+import com.bytedance.bitsail.entry.flink.utils.FlinkPackageResolver;
import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Created 2022/8/5
@@ -52,39 +60,65 @@ public class FlinkEngineRunner implements EngineRunner {
private DeploymentSupplierFactory deploymentSupplierFactory;
- public FlinkEngineRunner() {
- deploymentSupplierFactory = new DeploymentSupplierFactory();
+ private BitSailConfiguration sysConfiguration;
+
+ private Path flinkDir;
+
+ @Override
+ public void initializeEngine(BitSailConfiguration sysConfiguration) {
+ this.deploymentSupplierFactory = new DeploymentSupplierFactory();
+ this.sysConfiguration = sysConfiguration;
+ this.flinkDir = Paths.get(sysConfiguration.getNecessaryOption(FlinkRunnerConfigOptions.FLINK_HOME, CommonErrorCode.CONFIG_ERROR));
+ LOG.info("Find flink dir = {} in System configuration.", flinkDir);
+ if (!Files.exists(flinkDir)) {
+ LOG.error("Flink dir = {} not exists in fact, plz check the system configuration.", flinkDir);
+ throw new IllegalArgumentException(String.format("Flink dir %s not exists.", flinkDir));
+ }
}
@Override
- public ProcessBuilder getProcBuilder(BitSailConfiguration jobConfiguration, BaseCommandArgs baseCommandArgs, String[] args) {
+ @SneakyThrows
+ public void loadLibrary(URLClassLoader classLoader) {
+ Path flinkLibDir = FlinkPackageResolver.getFlinkLibDir(flinkDir);
+ LOG.info("Load flink library from path: {}.", flinkLibDir);
+
+ try (Stream libraries = Files.list(flinkLibDir)) {
+ List flinkRuntimeLibraries = libraries
+ .filter(library -> StringUtils.startsWith(library.getFileName().toString(), FlinkPackageResolver.FLINK_LIB_DIST_JAR_NAME))
+ .collect(Collectors.toList());
+ Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+ method.setAccessible(true);
+ for (Path runtimeLibrary : flinkRuntimeLibraries) {
+ method.invoke(classLoader, runtimeLibrary.toFile().toURL());
+ LOG.info("Load flink runtime library {} to classpath.", runtimeLibrary);
+ }
+ }
+ }
+
+ @Override
+ public ProcessBuilder getProcBuilder(BitSailConfiguration jobConfiguration,
+ BaseCommandArgs baseCommandArgs) throws IOException {
String argsMainAction = baseCommandArgs.getMainAction();
switch (argsMainAction) {
case CommandAction.RUN_COMMAND:
- return getRunProcBuilder(jobConfiguration, baseCommandArgs, args);
+ return getRunProcBuilder(jobConfiguration, baseCommandArgs);
default:
throw new UnsupportedOperationException(String.format("Main action %s not support in flink engine.", argsMainAction));
}
}
- ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration, BaseCommandArgs baseCommandArgs, String[] args) {
+ ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration,
+ BaseCommandArgs baseCommandArgs) throws IOException {
FlinkRunCommandArgs flinkCommandArgs = new FlinkRunCommandArgs();
- String[] unknownArgs = CommandArgsParser.parseArguments(args, flinkCommandArgs);
+ CommandArgsParser.parseArguments(baseCommandArgs.getUnknownOptions(), flinkCommandArgs);
- BitSailConfiguration sysConfiguration = BitSailSystemConfiguration.loadSysConfiguration();
DeploymentSupplier deploymentSupplier = deploymentSupplierFactory.getDeploymentSupplier(flinkCommandArgs,
jobConfiguration);
ProcessBuilder flinkProcBuilder = new ProcessBuilder();
List flinkCommands = Lists.newArrayList();
- String flinkDir = sysConfiguration.getNecessaryOption(FlinkRunnerConfigOptions.FLINK_HOME, CommonErrorCode.CONFIG_ERROR);
- LOG.info("Find flink dir = {} in System configuration.", flinkDir);
- if (!Files.exists(Paths.get(flinkDir))) {
- LOG.error("Flink dir = {} not exists in fact, plz check the system configuration.", flinkDir);
- throw new IllegalArgumentException(String.format("Flink dir %s not exists.", flinkDir));
- }
flinkCommands.add(flinkDir + "/bin/flink");
flinkCommands.add(flinkCommandArgs.getExecutionMode());
deploymentSupplier.addDeploymentCommands(baseCommandArgs, flinkCommands);
@@ -95,14 +129,12 @@ ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration, BaseComm
flinkCommandArgs,
flinkCommands);
- flinkCommands.add("-D");
- flinkCommands.add("execution.attached=" + !baseCommandArgs.isDetach());
if (!baseCommandArgs.isDetach()) {
flinkCommands.add("-sae");
+ } else {
+ flinkCommands.add("--detached");
}
- flinkCommands.addAll(Arrays.asList(unknownArgs));
-
if (sysConfiguration.fieldExists(FlinkRunnerConfigOptions.FLINK_DEFAULT_PROPERTIES)) {
for (Map.Entry property : sysConfiguration.getFlattenMap(FlinkRunnerConfigOptions.FLINK_DEFAULT_PROPERTIES.key()).entrySet()) {
LOG.info("Add System property {} = {}.", property.getKey(), property.getValue());
@@ -122,6 +154,8 @@ ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration, BaseComm
flinkCommands.add(baseCommandArgs.getJobConf());
flinkProcBuilder.command(flinkCommands);
+
+ FlinkSecurityHandler.processSecurity(sysConfiguration, flinkProcBuilder, flinkDir);
return flinkProcBuilder;
}
@@ -129,4 +163,5 @@ ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration, BaseComm
public String engineName() {
return "flink";
}
+
}
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/security/FlinkSecurityHandler.java b/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/security/FlinkSecurityHandler.java
new file mode 100644
index 000000000..e6c7530e6
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/security/FlinkSecurityHandler.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.entry.flink.security;
+
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.component.format.security.kerberos.option.KerberosOptions;
+import com.bytedance.bitsail.entry.flink.utils.FlinkPackageResolver;
+
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class FlinkSecurityHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkSecurityHandler.class);
+
+ private static final String CONTEXT_CLIENT = "Client";
+ private static final String ENV_PROP_FLINK_CONF_DIR = "FLINK_CONF_DIR";
+
+ public static void processSecurity(BitSailConfiguration sysConfiguration,
+ ProcessBuilder processBuilder,
+ Path flinkDir) throws IOException {
+ if (!sysConfiguration.get(KerberosOptions.KERBEROS_ENABLE)) {
+ return;
+ }
+ LOG.info("Kerberos (global) is enabled.");
+ String principal = sysConfiguration.get(KerberosOptions.KERBEROS_PRINCIPAL);
+ String keytabPath = sysConfiguration.get(KerberosOptions.KERBEROS_KEYTAB_PATH);
+ String krb5Path = sysConfiguration.get(KerberosOptions.KERBEROS_KRB5_CONF_PATH);
+
+ Configuration flinkConfiguration = loadFlinkConfiguration(FlinkPackageResolver.getFlinkConfDir(flinkDir));
+
+ flinkConfiguration.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ flinkConfiguration.set(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal);
+ flinkConfiguration.setString("security.kerberos.krb5-conf.path", krb5Path);
+
+ String loginContexts = flinkConfiguration.get(SecurityOptions.KERBEROS_LOGIN_CONTEXTS);
+ if (StringUtils.isNotEmpty(loginContexts)) {
+ Set contextSet = Arrays.stream(loginContexts.split(",")).collect(Collectors.toSet());
+ contextSet.add(CONTEXT_CLIENT);
+ loginContexts = contextSet.stream().collect(Collectors.joining(","));
+ flinkConfiguration.set(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, loginContexts);
+ } else {
+ flinkConfiguration.set(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, CONTEXT_CLIENT);
+ }
+
+ Path tmpFlinkConfDir = writeConfToTmpFile(flinkConfiguration);
+ symbolicLinkFlinkLog4j(flinkDir, tmpFlinkConfDir);
+ exposeFlinkConfDir(processBuilder, tmpFlinkConfDir);
+ }
+
+ static Configuration loadFlinkConfiguration(Path flinkConfDir) {
+ LOG.info("Load flink configuration from path: {}.", flinkConfDir);
+ return GlobalConfiguration.loadConfiguration(flinkConfDir.toString());
+ }
+
+ public static Path writeConfToTmpFile(Configuration flinkConfiguration) throws IOException {
+ File tmpDir = Files.createTempDir();
+
+ File tmpFlinkConf = Paths.get(tmpDir.getPath(),
+ UUID.randomUUID().toString(),
+ FlinkPackageResolver.FLINK_CONF_FILE).toFile();
+
+ if (tmpFlinkConf.exists()) {
+ FileUtils.deleteQuietly(tmpFlinkConf);
+ }
+ LOG.info("Creating new tmp flink-conf file in path {}", tmpFlinkConf.toPath());
+ Path tmpFlinkConfDir = Paths.get(tmpFlinkConf.getParent());
+ Files.createParentDirs(tmpFlinkConf);
+ //register delete on exit.
+ tmpFlinkConfDir.toFile().deleteOnExit();
+ tmpFlinkConf.createNewFile();
+ tmpFlinkConf.deleteOnExit();
+
+ try (FileWriter fileWriter = new FileWriter(tmpFlinkConf);
+ PrintWriter printWriter = new PrintWriter(fileWriter)) {
+ for (String key : flinkConfiguration.keySet()) {
+ String value = flinkConfiguration.getString(key, null);
+ printWriter.print(key);
+ printWriter.print(": ");
+ printWriter.println(value);
+ }
+ LOG.info("Success to write flink conf to file.");
+ return tmpFlinkConfDir;
+ }
+ }
+
+ /**
+ * Flink will find log4j/logback configuration in the ENV property `FLINK_CONF_DIR`.
+ * So if we want to change the flink conf dir to temporary dir, the log configuration file also need.
+ */
+ private static void symbolicLinkFlinkLog4j(Path flinkDir, Path tmpFlinkConfDir) throws IOException {
+ Path flinkConfDir = FlinkPackageResolver.getFlinkConfDir(flinkDir);
+ try (Stream flinkLogConfPath = java.nio.file.Files.list(flinkConfDir)) {
+ List flinkLogConfPaths = flinkLogConfPath.filter(file -> file.getFileName().toString()
+ .startsWith(FlinkPackageResolver.FLINK_LOG_FILE_PREFIX))
+ .collect(Collectors.toList());
+ for (Path flinkLogConf : flinkLogConfPaths) {
+ Path resolve = tmpFlinkConfDir.resolve(flinkLogConf.getFileName());
+ LOG.info("Create flink log symbolic link from {} to {}.", flinkLogConf, resolve);
+ java.nio.file.Files.createSymbolicLink(resolve, flinkLogConf);
+ resolve.toFile().deleteOnExit();
+ }
+ }
+ }
+
+ public static void exposeFlinkConfDir(ProcessBuilder procBuilder,
+ Path tmpFlinkConfDir) {
+ if (Objects.nonNull(tmpFlinkConfDir)) {
+ Map envProps = procBuilder.environment();
+ envProps.put(ENV_PROP_FLINK_CONF_DIR, tmpFlinkConfDir.toString());
+ LOG.info("Set env prop in procBuilder: {}={}", ENV_PROP_FLINK_CONF_DIR, tmpFlinkConfDir);
+ }
+ }
+}
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/utils/FlinkPackageResolver.java b/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/utils/FlinkPackageResolver.java
new file mode 100644
index 000000000..d98164ec0
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/main/java/com/bytedance/bitsail/entry/flink/utils/FlinkPackageResolver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.bytedance.bitsail.entry.flink.utils;
+
+import java.nio.file.Path;
+
+public class FlinkPackageResolver {
+
+ private static final String FLINK_LIB_DIR = "./lib";
+ private static final String FLINK_CONF_DIR = "./conf";
+ public static final String FLINK_CONF_FILE = "flink-conf.yaml";
+ public static final String FLINK_LIB_DIST_JAR_NAME = "flink-dist";
+ public static final String FLINK_LOG_FILE_PREFIX = "log4j";
+
+ public static Path getFlinkConfDir(Path rootDir) {
+ return rootDir.resolve(FLINK_CONF_DIR);
+ }
+
+ public static Path getFlinkLibDir(Path rootDir) {
+ return rootDir.resolve(FLINK_LIB_DIR);
+ }
+}
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/main/resources/flink-conf.yaml b/bitsail-clients/bitsail-client-entry-flink/src/main/resources/flink-conf.yaml
new file mode 100644
index 000000000..b58b820a7
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/main/resources/flink-conf.yaml
@@ -0,0 +1,256 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+
+# The total process memory size for the JobManager.
+#
+# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
+
+jobmanager.memory.process.size: 1600m
+
+
+# The total process memory size for the TaskManager.
+#
+# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
+
+taskmanager.memory.process.size: 1728m
+
+# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
+# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
+#
+# taskmanager.memory.flink.size: 1280m
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+#
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper stores
+# the small ground truth for checkpoint and leader election, this location stores
+# the larger objects, like persisted dataflow graphs.
+#
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...)
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled.
+#
+# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
+# .
+#
+# state.backend: filesystem
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend).
+#
+# state.backend.incremental: false
+
+# The failover strategy, i.e., how the job computation recovers from task failures.
+# Only restart tasks that may have been affected by the task failure, which typically includes
+# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
+
+jobmanager.execution.failover-strategy: region
+
+#==============================================================================
+# Rest & web frontend
+#==============================================================================
+
+# The port to which the REST client connects to. If rest.bind-port has
+# not been specified, then the server will bind to this port as well.
+#
+#rest.port: 8081
+
+# The address to which the REST client will connect to
+#
+#rest.address: 0.0.0.0
+
+# Port range for the REST and web server to bind to.
+#
+#rest.bind-port: 8080-8090
+
+# The address that the REST & web server binds to
+#
+#rest.bind-address: 0.0.0.0
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.submit.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn or Mesos, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+# /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in "security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+#historyserver.web.port: 8082
+
+# Comma separated list of directories to monitor for completed jobs.
+#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+
+# Interval in milliseconds for refreshing the monitored directories.
+#historyserver.archive.fs.refresh-interval: 10000
+
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/test/java/com/bytedance/bitsail/entry/flink/engine/FlinkEngineRunnerTest.java b/bitsail-clients/bitsail-client-entry-flink/src/test/java/com/bytedance/bitsail/entry/flink/engine/FlinkEngineRunnerTest.java
index 0e176f803..bd62f5475 100644
--- a/bitsail-clients/bitsail-client-entry-flink/src/test/java/com/bytedance/bitsail/entry/flink/engine/FlinkEngineRunnerTest.java
+++ b/bitsail-clients/bitsail-client-entry-flink/src/test/java/com/bytedance/bitsail/entry/flink/engine/FlinkEngineRunnerTest.java
@@ -19,7 +19,9 @@
import com.bytedance.bitsail.client.api.command.BaseCommandArgs;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.configuration.BitSailSystemConfiguration;
import com.bytedance.bitsail.common.configuration.ConfigParser;
+import com.bytedance.bitsail.entry.flink.configuration.FlinkRunnerConfigOptions;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
@@ -32,6 +34,8 @@
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
@@ -45,7 +49,7 @@ public class FlinkEngineRunnerTest {
public EnvironmentVariables variables = new EnvironmentVariables();
private BaseCommandArgs baseCommandArgs;
- private BitSailConfiguration globalConfiguration;
+ private BitSailConfiguration jobConfiguration;
@Before
public void before() throws URISyntaxException, IOException {
@@ -58,20 +62,36 @@ public void before() throws URISyntaxException, IOException {
properties.put("blob.fetch.num-concurrent", "32");
baseCommandArgs.setProperties(properties);
- globalConfiguration = ConfigParser.fromRawConfPath(baseCommandArgs.getJobConf());
+ jobConfiguration = ConfigParser.fromRawConfPath(baseCommandArgs.getJobConf());
File file = new File("/tmp/embedded/flink/bin/flink");
Files.createParentDirs(file);
}
@Test
- public void testGetFlinkProcBuilder() {
+ public void testGetFlinkProcBuilder() throws IOException {
String[] flinkRunCommandArgs = new String[] {"--execution-mode", "run", "--queue", "default", "--deployment-mode", "yarn-per-job"};
+ baseCommandArgs.setUnknownOptions(flinkRunCommandArgs);
+ BitSailConfiguration sysConfiguration = BitSailSystemConfiguration.loadSysConfiguration();
FlinkEngineRunner flinkEngineRunner = new FlinkEngineRunner();
+ flinkEngineRunner.initializeEngine(sysConfiguration);
ProcessBuilder runProcBuilder = flinkEngineRunner
- .getRunProcBuilder(globalConfiguration, baseCommandArgs, flinkRunCommandArgs);
+ .getRunProcBuilder(jobConfiguration, baseCommandArgs);
List command = runProcBuilder.command();
- Assert.assertEquals(64, command.size());
+ Assert.assertEquals(62, command.size());
+ }
+
+ @Test
+ public void testLoadLibrary() throws URISyntaxException {
+ FlinkEngineRunner flinkEngineRunner = new FlinkEngineRunner();
+ String path = Paths.get(FlinkEngineRunnerTest.class.getClassLoader().getResource("").toURI()).toString();
+ BitSailConfiguration sysConfiguration = BitSailConfiguration.newDefault();
+ sysConfiguration.set(FlinkRunnerConfigOptions.FLINK_HOME, path);
+ flinkEngineRunner.initializeEngine(sysConfiguration);
+ URLClassLoader urlClassLoader = new URLClassLoader(new URL[] {});
+ flinkEngineRunner.loadLibrary(urlClassLoader);
+ URL[] urLs = urlClassLoader.getURLs();
+ Assert.assertNotNull(urLs);
}
}
\ No newline at end of file
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/test/java/com/bytedance/bitsail/entry/flink/security/FlinkSecurityHandlerTest.java b/bitsail-clients/bitsail-client-entry-flink/src/test/java/com/bytedance/bitsail/entry/flink/security/FlinkSecurityHandlerTest.java
new file mode 100644
index 000000000..5e4a468cb
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/test/java/com/bytedance/bitsail/entry/flink/security/FlinkSecurityHandlerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.entry.flink.security;
+
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.component.format.security.kerberos.option.KerberosOptions;
+
+import lombok.SneakyThrows;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+
+public class FlinkSecurityHandlerTest {
+
+ @Test
+ public void testDefaultConf() {
+ Path path = getResourcePath("conf");
+ Configuration flinkConfiguration = FlinkSecurityHandler.loadFlinkConfiguration(path);
+ Assert.assertEquals("1", flinkConfiguration.getString("parallelism.default", null));
+ }
+
+ @Test
+ public void testUserDefinedConf() {
+ Path path = getResourcePath("test_dir/conf");
+ Configuration flinkConfiguration = FlinkSecurityHandler.loadFlinkConfiguration(path);
+ Assert.assertEquals("2", flinkConfiguration.getString("parallelism.default", null));
+ }
+
+ @Test
+ public void testWriteFlinkConf() throws IOException {
+ Path path = getResourcePath("conf");
+ Configuration conf = FlinkSecurityHandler.loadFlinkConfiguration(path);
+ Path tmpConfDir = FlinkSecurityHandler.writeConfToTmpFile(conf);
+
+ File tmpConfFile = tmpConfDir.resolve("flink-conf.yaml").toFile();
+ Assert.assertTrue(tmpConfFile.exists());
+ }
+
+ @Test
+ public void testHandleGlobal() throws IOException {
+ String workingDir = getResourcePath("").toString();
+
+ BitSailConfiguration sysConfiguration = BitSailConfiguration.newDefault();
+ sysConfiguration.set(KerberosOptions.KERBEROS_ENABLE, true);
+ sysConfiguration.set(KerberosOptions.KERBEROS_KEYTAB_PATH, Paths.get(workingDir, "test.keytab").toString());
+ sysConfiguration.set(KerberosOptions.KERBEROS_PRINCIPAL, "test_principal");
+ sysConfiguration.set(KerberosOptions.KERBEROS_KRB5_CONF_PATH, (Paths.get(workingDir, "krb5.conf").toString()));
+
+ ProcessBuilder processBuilder = new ProcessBuilder();
+ FlinkSecurityHandler.processSecurity(sysConfiguration, processBuilder, Paths.get(workingDir));
+ Map environment = processBuilder.environment();
+ Path flinkConfDir = Paths.get(environment.get("FLINK_CONF_DIR"));
+ Assert.assertNotNull(flinkConfDir);
+ Files.exists(flinkConfDir);
+ Configuration flinkConfiguration = FlinkSecurityHandler.loadFlinkConfiguration(flinkConfDir);
+ Assert.assertEquals(flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB),
+ sysConfiguration.get(KerberosOptions.KERBEROS_KEYTAB_PATH));
+ Assert.assertEquals(flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL),
+ sysConfiguration.get(KerberosOptions.KERBEROS_PRINCIPAL));
+ }
+
+ @SneakyThrows
+ private Path getResourcePath(String resource) {
+ return Paths.get(FlinkSecurityHandlerTest.class
+ .getClassLoader()
+ .getResource(resource)
+ .toURI()
+ );
+ }
+}
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/test/resources/bitsail/hdfs_to_print.json b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/bitsail/hdfs_to_print.json
new file mode 100644
index 000000000..53b5402f9
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/bitsail/hdfs_to_print.json
@@ -0,0 +1,49 @@
+{
+ "job": {
+ "common": {
+ "cid": 0,
+ "domain": "test",
+ "job_id": -2501,
+ "job_name": "bitsail_hdfs_to_print",
+ "instance_id": -20501,
+ "user_name": "user",
+ "kerberos": {
+ "keytab_path": "/root/test/test.keytab",
+ "principal": "test/test@HADOOP.COM",
+ "krb5_conf_path": "/etc/krb5.conf"
+ }
+ },
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopInputFormat",
+ "hadoop_inputformat_class": "org.apache.hadoop.mapred.TextInputFormat",
+ "path_list": "hdfs://test_namesergice/user/test/test_data.json",
+ "content_type":"json",
+ "columns": [
+ {
+ "name":"id",
+ "type":"int"
+ },
+ {
+ "name":"field_str",
+ "type":"string"
+ },
+ {
+ "name":"field_bigint",
+ "type":"bigint"
+ },
+ {
+ "name":"field_float",
+ "type":"double"
+ },
+ {
+ "name":"field_date",
+ "type":"date"
+ }
+ ],
+ "reader_parallelism_num":1
+ },
+ "writer": {
+ "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink"
+ }
+ }
+}
\ No newline at end of file
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/test/resources/conf/flink-conf.yaml b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/conf/flink-conf.yaml
new file mode 100644
index 000000000..b58b820a7
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/conf/flink-conf.yaml
@@ -0,0 +1,256 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+
+# The total process memory size for the JobManager.
+#
+# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
+
+jobmanager.memory.process.size: 1600m
+
+
+# The total process memory size for the TaskManager.
+#
+# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
+
+taskmanager.memory.process.size: 1728m
+
+# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
+# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
+#
+# taskmanager.memory.flink.size: 1280m
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+#
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper stores
+# the small ground truth for checkpoint and leader election, this location stores
+# the larger objects, like persisted dataflow graphs.
+#
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...)
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled.
+#
+# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
+# .
+#
+# state.backend: filesystem
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend).
+#
+# state.backend.incremental: false
+
+# The failover strategy, i.e., how the job computation recovers from task failures.
+# Only restart tasks that may have been affected by the task failure, which typically includes
+# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
+
+jobmanager.execution.failover-strategy: region
+
+#==============================================================================
+# Rest & web frontend
+#==============================================================================
+
+# The port to which the REST client connects to. If rest.bind-port has
+# not been specified, then the server will bind to this port as well.
+#
+#rest.port: 8081
+
+# The address to which the REST client will connect to
+#
+#rest.address: 0.0.0.0
+
+# Port range for the REST and web server to bind to.
+#
+#rest.bind-port: 8080-8090
+
+# The address that the REST & web server binds to
+#
+#rest.bind-address: 0.0.0.0
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.submit.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn or Mesos, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+# /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in "security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+#historyserver.web.port: 8082
+
+# Comma separated list of directories to monitor for completed jobs.
+#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+
+# Interval in milliseconds for refreshing the monitored directories.
+#historyserver.archive.fs.refresh-interval: 10000
+
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/test/resources/conf/log4j.properties b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/conf/log4j.properties
new file mode 100644
index 000000000..65b48d4d7
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/conf/log4j.properties
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/test/resources/lib/file_a b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/lib/file_a
new file mode 100644
index 000000000..65b48d4d7
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/lib/file_a
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/test/resources/lib/flink-dist-a b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/lib/flink-dist-a
new file mode 100644
index 000000000..65b48d4d7
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/lib/flink-dist-a
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/bitsail-clients/bitsail-client-entry-flink/src/test/resources/test_dir/conf/flink-conf.yaml b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/test_dir/conf/flink-conf.yaml
new file mode 100644
index 000000000..15afe520d
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry-flink/src/test/resources/test_dir/conf/flink-conf.yaml
@@ -0,0 +1,256 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+
+# The total process memory size for the JobManager.
+#
+# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
+
+jobmanager.memory.process.size: 1600m
+
+
+# The total process memory size for the TaskManager.
+#
+# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
+
+taskmanager.memory.process.size: 1728m
+
+# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
+# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
+#
+# taskmanager.memory.flink.size: 1280m
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 2
+
+# The default file system scheme and authority.
+#
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper stores
+# the small ground truth for checkpoint and leader election, this location stores
+# the larger objects, like persisted dataflow graphs.
+#
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...)
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled.
+#
+# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
+# .
+#
+# state.backend: filesystem
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend).
+#
+# state.backend.incremental: false
+
+# The failover strategy, i.e., how the job computation recovers from task failures.
+# Only restart tasks that may have been affected by the task failure, which typically includes
+# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
+
+jobmanager.execution.failover-strategy: region
+
+#==============================================================================
+# Rest & web frontend
+#==============================================================================
+
+# The port to which the REST client connects to. If rest.bind-port has
+# not been specified, then the server will bind to this port as well.
+#
+#rest.port: 8081
+
+# The address to which the REST client will connect to
+#
+#rest.address: 0.0.0.0
+
+# Port range for the REST and web server to bind to.
+#
+#rest.bind-port: 8080-8090
+
+# The address that the REST & web server binds to
+#
+#rest.bind-address: 0.0.0.0
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.submit.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn or Mesos, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+# /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in "security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+#historyserver.web.port: 8082
+
+# Comma separated list of directories to monitor for completed jobs.
+#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+
+# Interval in milliseconds for refreshing the monitored directories.
+#historyserver.archive.fs.refresh-interval: 10000
+
diff --git a/bitsail-clients/bitsail-client-entry/pom.xml b/bitsail-clients/bitsail-client-entry/pom.xml
index 697d61fe9..e43b2dd18 100644
--- a/bitsail-clients/bitsail-client-entry/pom.xml
+++ b/bitsail-clients/bitsail-client-entry/pom.xml
@@ -31,6 +31,24 @@
bitsail-client-api
${revision}
+
+
+ com.bytedance.bitsail
+ bitsail-component-security-kerberos
+ ${revision}
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ log4j
+ log4j
+
+
+
diff --git a/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/Entry.java b/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/Entry.java
index 7a52d8722..c4e879040 100644
--- a/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/Entry.java
+++ b/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/Entry.java
@@ -18,11 +18,13 @@
package com.bytedance.bitsail.client.entry;
import com.bytedance.bitsail.client.api.command.BaseCommandArgs;
-import com.bytedance.bitsail.client.api.command.BaseCommandArgsWithUnknownOptions;
import com.bytedance.bitsail.client.api.command.CommandAction;
import com.bytedance.bitsail.client.api.command.CommandArgsParser;
import com.bytedance.bitsail.client.api.engine.EngineRunner;
+import com.bytedance.bitsail.client.entry.constants.EntryConstants;
+import com.bytedance.bitsail.client.entry.security.SecurityContextFactory;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.configuration.BitSailSystemConfiguration;
import com.bytedance.bitsail.common.configuration.ConfigParser;
import com.beust.jcommander.internal.Maps;
@@ -31,6 +33,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
@@ -45,12 +48,14 @@ public class Entry {
private static final Map RUNNERS = Maps.newHashMap();
private static final Class ENGINE_SPI_CLASS = EngineRunner.class;
- private static final int ERROR_EXIT_CODE = 1;
-
private static final Object LOCK = new Object();
private static Process process;
private static volatile boolean running;
+ private final URLClassLoader classLoader;
+ private final BitSailConfiguration sysConfiguration;
+ private final BaseCommandArgs baseCommandArgs;
+
private static void loadAllEngines() {
for (EngineRunner runner : ServiceLoader.load(ENGINE_SPI_CLASS)) {
String engineName = runner.engineName();
@@ -59,67 +64,86 @@ private static void loadAllEngines() {
}
}
- public static void main(String[] args) {
+ private Entry(BitSailConfiguration sysConfiguration,
+ BaseCommandArgs baseCommandArgs) {
+ this.sysConfiguration = sysConfiguration;
+ this.baseCommandArgs = baseCommandArgs;
+ this.classLoader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
loadAllEngines();
+ }
+
+ @SuppressWarnings("checkstyle:EmptyLineSeparator")
+ public static void main(String[] args) {
+ //load system configuration.
+ BitSailConfiguration sysConfiguration = BitSailSystemConfiguration.loadSysConfiguration();
+
+ //load command arguments.
+ BaseCommandArgs baseCommandArgs = loadCommandArguments(args);
+
+ int exit;
try {
- int exit = handleCommand(args);
+ Entry entry = new Entry(sysConfiguration, baseCommandArgs);
+
+ SecurityContextFactory securityContext = SecurityContextFactory
+ .load(entry.sysConfiguration, entry.baseCommandArgs);
+
+ exit = securityContext.doAs(
+ () -> entry.runCommand());
+
System.exit(exit);
} catch (Exception e) {
LOG.error("Exception occurred when run command .", e);
- System.exit(ERROR_EXIT_CODE);
+ exit = EntryConstants.ERROR_EXIT_CODE_UNKNOWN_FAILED;
+ System.exit(exit);
}
}
@SuppressWarnings("checkstyle:RegexpSingleline")
- private static int handleCommand(String[] args) throws IOException, InterruptedException {
- if (args.length < 1) {
- CommandArgsParser.printHelp();
- System.out.println("Please specify an action. Supported action are " + CommandAction.RUN_COMMAND);
- return ERROR_EXIT_CODE;
- }
- BaseCommandArgsWithUnknownOptions baseCommandArgsWithUnknownOptions = buildCommandArgs(args);
- ProcessBuilder processBuilder = buildProcessBuilder(baseCommandArgsWithUnknownOptions);
- return startProcessBuilder(processBuilder, baseCommandArgsWithUnknownOptions.getBaseCommandArgs());
- }
-
- static BaseCommandArgsWithUnknownOptions buildCommandArgs(String[] args) {
+ public static BaseCommandArgs loadCommandArguments(String[] args) {
if (args.length < 1) {
CommandArgsParser.printHelp();
- throw new IllegalArgumentException("Please specify an action. Supported action are " + CommandAction.RUN_COMMAND);
+ System.out.println("Please specify an action. Supported action are" + CommandAction.RUN_COMMAND);
+ System.exit(EntryConstants.ERROR_EXIT_CODE_COMMAND_ERROR);
}
final String mainCommand = args[0];
final String[] params = Arrays.copyOfRange(args, 1, args.length);
BaseCommandArgs baseCommandArgs = new BaseCommandArgs();
- String[] unknownOptions = CommandArgsParser.parseArguments(params, baseCommandArgs);
+ baseCommandArgs.setUnknownOptions(CommandArgsParser.parseArguments(params, baseCommandArgs));
baseCommandArgs.setMainAction(mainCommand);
- return BaseCommandArgsWithUnknownOptions.builder().baseCommandArgs(baseCommandArgs).unknownOptions(unknownOptions).build();
+
+ return baseCommandArgs;
+ }
+
+ private int runCommand() throws IOException, InterruptedException {
+ ProcessBuilder processBuilder = buildProcessBuilder(sysConfiguration, baseCommandArgs);
+ return startProcessBuilder(processBuilder, baseCommandArgs);
}
- private static ProcessBuilder buildProcessBuilder(BaseCommandArgsWithUnknownOptions baseCommandArgsWithUnknownOptions) {
- BaseCommandArgs baseCommandArgs = baseCommandArgsWithUnknownOptions.getBaseCommandArgs();
+ private ProcessBuilder buildProcessBuilder(BitSailConfiguration sysConfiguration,
+ BaseCommandArgs baseCommandArgs) throws IOException {
BitSailConfiguration jobConfiguration =
ConfigParser.fromRawConfPath(baseCommandArgs.getJobConf());
String engineName = baseCommandArgs.getEngineName();
- LOG.info("Input argument engine name: {}.", engineName);
+ LOG.info("Final engine: {}.", engineName);
EngineRunner engineRunner = RUNNERS.get(StringUtils.upperCase(engineName));
if (Objects.isNull(engineRunner)) {
throw new IllegalArgumentException(String.format("Engine %s not support now.", engineName));
}
- engineRunner.addEngineClasspath();
+ engineRunner.initializeEngine(sysConfiguration);
+ engineRunner.loadLibrary(classLoader);
ProcessBuilder procBuilder = engineRunner.getProcBuilder(
jobConfiguration,
- baseCommandArgs,
- baseCommandArgsWithUnknownOptions.getUnknownOptions());
+ baseCommandArgs);
LOG.info("Engine {}'s command: {}.", baseCommandArgs.getEngineName(), procBuilder.command());
return procBuilder;
}
- private static int startProcessBuilder(ProcessBuilder procBuilder, BaseCommandArgs runCommandArgs) throws IOException, InterruptedException {
- procBuilder
- .redirectOutput(ProcessBuilder.Redirect.INHERIT)
+ private int startProcessBuilder(ProcessBuilder procBuilder,
+ BaseCommandArgs baseCommandArgs) throws IOException, InterruptedException {
+ procBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT)
.redirectError(ProcessBuilder.Redirect.INHERIT);
Thread hook = new Thread(() -> {
@@ -134,7 +158,7 @@ private static int startProcessBuilder(ProcessBuilder procBuilder, BaseCommandAr
});
Runtime.getRuntime().addShutdownHook(hook);
- int i = internalRunProcess(procBuilder, runCommandArgs);
+ int i = internalRunProcess(procBuilder, baseCommandArgs);
Runtime.getRuntime().removeShutdownHook(hook);
return i;
}
@@ -145,10 +169,6 @@ private static int internalRunProcess(ProcessBuilder procBuilder,
running = true;
process = procBuilder.start();
}
-
- if (!runCommandArgs.isDetach()) {
- return process.waitFor();
- }
- return ERROR_EXIT_CODE;
+ return process.waitFor();
}
}
diff --git a/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/constants/EntryConstants.java b/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/constants/EntryConstants.java
new file mode 100644
index 000000000..a3af38def
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/constants/EntryConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.bytedance.bitsail.client.entry.constants;
+
+public class EntryConstants {
+
+ public static final int SUCCESS_EXIT_CODE = 0;
+ public static final int ERROR_EXIT_CODE_COMMAND_ERROR = 1;
+ public static final int ERROR_EXIT_CODE_RUN_FAILED = 2;
+ public static final int ERROR_EXIT_CODE_UNKNOWN_FAILED = 3;
+
+}
diff --git a/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/security/SecurityContextFactory.java b/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/security/SecurityContextFactory.java
new file mode 100644
index 000000000..c39ea0283
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry/src/main/java/com/bytedance/bitsail/client/entry/security/SecurityContextFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.bytedance.bitsail.client.entry.security;
+
+import com.bytedance.bitsail.base.extension.SecurityModule;
+import com.bytedance.bitsail.client.api.command.BaseCommandArgs;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.component.format.security.kerberos.option.KerberosOptions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.concurrent.Callable;
+
+public class SecurityContextFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(SecurityContextFactory.class);
+
+ private final BitSailConfiguration securityConfiguration;
+ private final List securityModules;
+
+ public static SecurityContextFactory load(BitSailConfiguration sysConfiguration,
+ BaseCommandArgs baseCommandArgs) {
+ return new SecurityContextFactory(sysConfiguration, baseCommandArgs);
+ }
+
+ public SecurityContextFactory(BitSailConfiguration sysConfiguration, BaseCommandArgs baseCommandArgs) {
+ this.securityConfiguration = mergeSecurityConfiguration(sysConfiguration, baseCommandArgs);
+ this.securityModules = loadSecurityModules();
+ }
+
+ private BitSailConfiguration mergeSecurityConfiguration(BitSailConfiguration sysConfiguration,
+ BaseCommandArgs baseCommandArgs) {
+ sysConfiguration.set(KerberosOptions.KERBEROS_ENABLE, baseCommandArgs.isEnableKerberos());
+ sysConfiguration.set(KerberosOptions.KERBEROS_KEYTAB_PATH, baseCommandArgs.getKeytabPath());
+ sysConfiguration.set(KerberosOptions.KERBEROS_KRB5_CONF_PATH, baseCommandArgs.getKrb5ConfPath());
+ sysConfiguration.set(KerberosOptions.KERBEROS_PRINCIPAL, baseCommandArgs.getPrincipal());
+ return sysConfiguration;
+ }
+
+ private List loadSecurityModules() {
+ List securityModules = new ArrayList<>();
+ for (SecurityModule module : ServiceLoader.load(SecurityModule.class)) {
+ securityModules.add(module);
+ }
+ return securityModules;
+ }
+
+ public T doAs(Callable callable) throws Exception {
+ for (SecurityModule securityModule : securityModules) {
+ securityModule.initializeModule(securityConfiguration);
+ LOG.info("Module {} start login.", securityModule.getClass().getSimpleName());
+ securityModule.login();
+ }
+ return callable.call();
+ }
+}
diff --git a/bitsail-clients/bitsail-client-entry/src/test/java/com/bytedance/bitsail/client/entry/EntryTest.java b/bitsail-clients/bitsail-client-entry/src/test/java/com/bytedance/bitsail/client/entry/EntryTest.java
index 835378c7c..a309a5e19 100644
--- a/bitsail-clients/bitsail-client-entry/src/test/java/com/bytedance/bitsail/client/entry/EntryTest.java
+++ b/bitsail-clients/bitsail-client-entry/src/test/java/com/bytedance/bitsail/client/entry/EntryTest.java
@@ -17,27 +17,19 @@
package com.bytedance.bitsail.client.entry;
-import com.bytedance.bitsail.client.api.command.BaseCommandArgsWithUnknownOptions;
+import com.bytedance.bitsail.client.api.command.BaseCommandArgs;
+import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
public class EntryTest {
- @Test(expected = IllegalArgumentException.class)
- public void testBuildCommandArgsWithEmptyArgs() {
- String[] args = {};
- Entry.buildCommandArgs(args);
- }
-
@Test
public void testBuildCommandArgs() {
String[] args = new String[] {"run", "--engine", "flink", "-d", "-sae"};
- BaseCommandArgsWithUnknownOptions commandArgsWithUnknownOptions =
- Entry.buildCommandArgs(args);
- assertEquals(commandArgsWithUnknownOptions.getUnknownOptions().length, 1);
- assertEquals(commandArgsWithUnknownOptions.getBaseCommandArgs().getEngineName(), "flink");
- assertEquals(commandArgsWithUnknownOptions.getBaseCommandArgs().getMainAction(), "run");
+ BaseCommandArgs baseCommandArgs = Entry.loadCommandArguments(args);
+ Assert.assertEquals(baseCommandArgs.getUnknownOptions().length, 1);
+ Assert.assertEquals(baseCommandArgs.getEngineName(), "flink");
+ Assert.assertEquals(baseCommandArgs.getMainAction(), "run");
}
}
diff --git a/bitsail-clients/bitsail-client-entry/src/test/java/com/bytedance/bitsail/client/entry/security/SecurityContextFactoryTest.java b/bitsail-clients/bitsail-client-entry/src/test/java/com/bytedance/bitsail/client/entry/security/SecurityContextFactoryTest.java
new file mode 100644
index 000000000..4a32d016b
--- /dev/null
+++ b/bitsail-clients/bitsail-client-entry/src/test/java/com/bytedance/bitsail/client/entry/security/SecurityContextFactoryTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.bytedance.bitsail.client.entry.security;
+
+import com.bytedance.bitsail.client.api.command.BaseCommandArgs;
+import com.bytedance.bitsail.client.entry.Entry;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SecurityContextFactoryTest {
+
+ @Test
+ public void testInitKerberosCommandArgs() {
+ String[] args = new String[] {
+ "run",
+ "--enable-kerberos",
+ "--keytab-path", "test.keytab",
+ "--principal", "test_principal",
+ "--krb5-conf-path", "test_krb5.conf"
+ };
+ BaseCommandArgs commandArgs = Entry.loadCommandArguments(args);
+ Assert.assertTrue(commandArgs.isEnableKerberos());
+ Assert.assertEquals("test.keytab", commandArgs.getKeytabPath());
+ Assert.assertEquals("test_principal", commandArgs.getPrincipal());
+ Assert.assertEquals("test_krb5.conf", commandArgs.getKrb5ConfPath());
+ }
+}
\ No newline at end of file
diff --git a/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/pom.xml b/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/pom.xml
new file mode 100644
index 000000000..fa0284c58
--- /dev/null
+++ b/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/pom.xml
@@ -0,0 +1,92 @@
+
+
+
+
+ bitsail-component-security
+ com.bytedance.bitsail
+ ${revision}
+
+ 4.0.0
+
+ bitsail-component-security-kerberos
+ ${revision}
+
+
+ 8
+ 8
+
+
+
+
+ com.bytedance.bitsail
+ bitsail-base
+
+
+
+ com.bytedance.bitsail
+ bitsail-common
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ log4j
+ log4j
+
+
+ org.codehaus.jackson
+ jackson-xc
+
+
+ org.codehaus.jackson
+ jackson-jaxrs
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ javax.servlet
+ servlet-api
+
+
+ metrics-core
+ com.codahale.metrics
+
+
+ netty
+ io.netty
+
+
+ org.byted.infsec
+ dps
+
+
+ org.apache.commons
+ commons-math3
+
+
+
+
+
\ No newline at end of file
diff --git a/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/common/KerberosConstants.java b/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/common/KerberosConstants.java
new file mode 100644
index 000000000..a8b7a5550
--- /dev/null
+++ b/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/common/KerberosConstants.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.component.format.security.kerberos.common;
+
+public class KerberosConstants {
+ public static final String SYSTEM_ENV_JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+
+ public static final String SYSTEM_ENV_KRB5_CONF_PATH = "java.security.krb5.conf";
+ public static final String USE_SUBJECT_CREDS_ONLY = "javax.security.auth.useSubjectCredsOnly";
+ public static final String HADOOP_AUTH_KEY = "hadoop.security.authentication";
+
+ public static final String OPTION_KEYTAB = "keyTab";
+ public static final String OPTION_USE_KEY_TAB = "useKeytab";
+ public static final String OPTION_CREDS_TYPE = "credsType";
+ public static final String OPTION_DO_NOT_PROMPT = "doNotPrompt";
+ public static final String OPTION_STORE_KEY = "storeKey";
+ public static final String OPTION_PRINCIPAL = "principal";
+ public static final String OPTION_REFRESH_KRB5_CONFIG = "refreshKrb5Config";
+
+ public static final String VALUE_CREDS_TYPE_BOTH = "both";
+ public static final String VALUE_BOOLEAN_TRUE = "true";
+
+ public static final String KRB5_LOGIN_MODULE_SUN = "com.sun.security.auth.module.Krb5LoginModule";
+ public static final String KRB5_LOGIN_MODULE_IBM = "com.ibm.security.auth.module.Krb5LoginModule";
+
+ public static final String TMP_FILEPATH_KRB5_CONTENT = "/tmp/kerbros-bitsail/krb5.conf";
+ public static final String TMP_FILEPATH_KEYTAB_CONTENT = "/tmp/kerbros-bitsail/principal.keytab";
+ public static final String TMP_FILEPATH_JAAS_CONTENT = "/tmp/kerberos-bitsail/jaas.conf";
+
+ public static final String KERBEROS_AUTH_MODE_APP = "app";
+ public static final String KERBEROS_AUTH_MODE_CONNECTOR = "connector";
+}
diff --git a/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/command/BaseCommandArgsWithUnknownOptions.java b/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/common/KerberosErrorCode.java
similarity index 53%
rename from bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/command/BaseCommandArgsWithUnknownOptions.java
rename to bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/common/KerberosErrorCode.java
index fe54e4a65..4a265a815 100644
--- a/bitsail-clients/bitsail-client-api/src/main/java/com/bytedance/bitsail/client/api/command/BaseCommandArgsWithUnknownOptions.java
+++ b/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/common/KerberosErrorCode.java
@@ -15,14 +15,35 @@
* limitations under the License.
*/
-package com.bytedance.bitsail.client.api.command;
+package com.bytedance.bitsail.component.format.security.kerberos.common;
-import lombok.Builder;
-import lombok.Getter;
+import com.bytedance.bitsail.common.exception.ErrorCode;
-@Builder
-@Getter
-public class BaseCommandArgsWithUnknownOptions {
- private BaseCommandArgs baseCommandArgs;
- private String[] unknownOptions;
-}
\ No newline at end of file
+public enum KerberosErrorCode implements ErrorCode {
+
+ JAAS_CONF_NOT_EXIST("Kerberos-00", "Jaas config file does not exist, which is required.");
+
+ private final String code;
+
+ private final String describe;
+
+ KerberosErrorCode(String code, String describe) {
+ this.code = code;
+ this.describe = describe;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return describe;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Describe:[%s]", code, describe);
+ }
+}
diff --git a/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/option/KerberosOptions.java b/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/option/KerberosOptions.java
new file mode 100644
index 000000000..88ee4a7aa
--- /dev/null
+++ b/bitsail-components/bitsail-components-security/bitsail-component-security-kerberos/src/main/java/com/bytedance/bitsail/component/format/security/kerberos/option/KerberosOptions.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.component.format.security.kerberos.option;
+
+import com.bytedance.bitsail.common.option.CommonOptions;
+import com.bytedance.bitsail.common.option.ConfigOption;
+
+import com.alibaba.fastjson.TypeReference;
+
+import java.util.Map;
+
+import static com.bytedance.bitsail.common.option.ConfigOptions.key;
+
+public interface KerberosOptions extends CommonOptions {
+
+ String SYS_KERBEROS = "sys.kerberos.";
+
+ ConfigOption KERBEROS_ENABLE =
+ key(SYS_KERBEROS + "enable")
+ .defaultValue(false);
+
+ ConfigOption KERBEROS_KEYTAB_PATH =
+ key(SYS_KERBEROS + "keytab_path")
+ .noDefaultValue(String.class);
+
+ ConfigOption KERBEROS_PRINCIPAL =
+ key(SYS_KERBEROS + "principal")
+ .noDefaultValue(String.class);
+
+ ConfigOption KERBEROS_KRB5_CONF_PATH =
+ key(SYS_KERBEROS + "krb5_conf_path")
+ .noDefaultValue(String.class);
+
+ ConfigOption KERBEROS_KRB5_USE_SUBJECT_CREDITS_ONLY =
+ key(SYS_KERBEROS + "use_subject_credits_only")
+ .noDefaultValue(Boolean.class);
+
+ ConfigOption