Skip to content

Commit

Permalink
Feature: support kerberos (#9)
Browse files Browse the repository at this point in the history
* Support kerberos authentication.
  • Loading branch information
BlockLiu authored Oct 13, 2022
1 parent 1a5099c commit a28ed03
Show file tree
Hide file tree
Showing 40 changed files with 2,057 additions and 89 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ At present, ***BitSail*** is mainly designed with the ELT model, which have EB d
- In Streaming mode, auto-detect the exists checkpoint and apply when job restart.
- ...

## Requirements

The latest version of bitsail has the following minimal requirements:

- Java 8 and higher for the build is required. For usage Java 8 is a minimum requirement;
- Maven 3.6 and higher;
- Operating system: no specific requirements (tested on Windows and Linux).

## Support Connectors

<table>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.bytedance.bitsail.base.extension;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;

import java.io.IOException;
import java.io.Serializable;

public interface SecurityModule extends Serializable {

void initializeModule(BitSailConfiguration securityConfiguration);

void login() throws IOException;

void logout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,18 @@ public class BaseCommandArgs implements CommandArgs {

@Parameter(names = {"--conf"})
private String jobConf;

@Parameter(names = {"--enable-kerberos"})
private boolean enableKerberos = false;

@Parameter(names = {"--keytab-path"})
private String keytabPath;

@Parameter(names = {"--principal"})
private String principal;

@Parameter(names = {"--krb5-conf-path"})
private String krb5ConfPath;

private String[] unknownOptions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@
import com.bytedance.bitsail.client.api.command.BaseCommandArgs;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;

import java.io.IOException;
import java.io.Serializable;
import java.net.URLClassLoader;

/**
* Created 2022/8/5
*/
public interface EngineRunner extends Serializable {
default void addEngineClasspath() {
default void loadLibrary(URLClassLoader classLoader) {
return;
}

void initializeEngine(BitSailConfiguration sysConfiguration);

ProcessBuilder getProcBuilder(BitSailConfiguration jobConfiguration,
BaseCommandArgs baseCommandArgs,
String[] args);
BaseCommandArgs baseCommandArgs) throws IOException;

String engineName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.client.api.command;

import org.junit.Assert;
import org.junit.Test;

public class KerberosCommandArgsParserTest {

@Test
public void testParseKerberosArgs() {
String[] args = new String[]{
"--keytab-path", "/etc/kerberos/keytab",
"--principal", "test_user",
"--krb5-conf-path", "/etc/kerberos/krb5.conf",
"--unknownkey", "unknown_value"
};

BaseCommandArgs kerberosCommandArgs = new BaseCommandArgs();
CommandArgsParser.parseArguments(args, kerberosCommandArgs);

Assert.assertEquals("/etc/kerberos/keytab", kerberosCommandArgs.getKeytabPath());
Assert.assertEquals("test_user", kerberosCommandArgs.getPrincipal());
Assert.assertEquals("/etc/kerberos/krb5.conf", kerberosCommandArgs.getKrb5ConfPath());
Assert.assertFalse(kerberosCommandArgs.isEnableKerberos());
}

@Test
public void testParseMoreArgs() {
String[] args = new String[]{
"--enable-kerberos",
"--keytab-path", "/root/dts_test/test.keytab",
"--principal", "admin/admin@HADOOP.COM",
"--krb5-conf-path", "/etc/krb5.conf"
};
BaseCommandArgs kerberosCommandArgs = new BaseCommandArgs();
CommandArgsParser.parseArguments(args, kerberosCommandArgs);
Assert.assertTrue(kerberosCommandArgs.isEnableKerberos());
}
}
13 changes: 13 additions & 0 deletions bitsail-clients/bitsail-client-entry-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,24 @@
<version>${revision}</version>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-security-kerberos</artifactId>
<version>${revision}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class FlinkRunCommandArgs implements CommandArgs {

@SuppressWarnings("checkstyle:MagicNumber")
@Parameter(names = "--priority",
description = "Specify the job's priority in resource manaegr, eg: yarn.")
description = "Specify the job's priority in resource manager, eg: yarn.")
private int priority = 5;

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,32 @@
import com.bytedance.bitsail.client.api.engine.EngineRunner;
import com.bytedance.bitsail.client.api.utils.PackageResolver;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.configuration.BitSailSystemConfiguration;
import com.bytedance.bitsail.common.exception.CommonErrorCode;
import com.bytedance.bitsail.entry.flink.command.FlinkRunCommandArgs;
import com.bytedance.bitsail.entry.flink.configuration.FlinkRunnerConfigOptions;
import com.bytedance.bitsail.entry.flink.deployment.DeploymentSupplier;
import com.bytedance.bitsail.entry.flink.deployment.DeploymentSupplierFactory;
import com.bytedance.bitsail.entry.flink.savepoint.FlinkRunnerSavepointLoader;
import com.bytedance.bitsail.entry.flink.security.FlinkSecurityHandler;
import com.bytedance.bitsail.entry.flink.utils.FlinkPackageResolver;

import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Created 2022/8/5
Expand All @@ -52,39 +60,65 @@ public class FlinkEngineRunner implements EngineRunner {

private DeploymentSupplierFactory deploymentSupplierFactory;

public FlinkEngineRunner() {
deploymentSupplierFactory = new DeploymentSupplierFactory();
private BitSailConfiguration sysConfiguration;

private Path flinkDir;

@Override
public void initializeEngine(BitSailConfiguration sysConfiguration) {
this.deploymentSupplierFactory = new DeploymentSupplierFactory();
this.sysConfiguration = sysConfiguration;
this.flinkDir = Paths.get(sysConfiguration.getNecessaryOption(FlinkRunnerConfigOptions.FLINK_HOME, CommonErrorCode.CONFIG_ERROR));
LOG.info("Find flink dir = {} in System configuration.", flinkDir);
if (!Files.exists(flinkDir)) {
LOG.error("Flink dir = {} not exists in fact, plz check the system configuration.", flinkDir);
throw new IllegalArgumentException(String.format("Flink dir %s not exists.", flinkDir));
}
}

@Override
public ProcessBuilder getProcBuilder(BitSailConfiguration jobConfiguration, BaseCommandArgs baseCommandArgs, String[] args) {
@SneakyThrows
public void loadLibrary(URLClassLoader classLoader) {
Path flinkLibDir = FlinkPackageResolver.getFlinkLibDir(flinkDir);
LOG.info("Load flink library from path: {}.", flinkLibDir);

try (Stream<Path> libraries = Files.list(flinkLibDir)) {
List<Path> flinkRuntimeLibraries = libraries
.filter(library -> StringUtils.startsWith(library.getFileName().toString(), FlinkPackageResolver.FLINK_LIB_DIST_JAR_NAME))
.collect(Collectors.toList());
Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
method.setAccessible(true);
for (Path runtimeLibrary : flinkRuntimeLibraries) {
method.invoke(classLoader, runtimeLibrary.toFile().toURL());
LOG.info("Load flink runtime library {} to classpath.", runtimeLibrary);
}
}
}

@Override
public ProcessBuilder getProcBuilder(BitSailConfiguration jobConfiguration,
BaseCommandArgs baseCommandArgs) throws IOException {
String argsMainAction = baseCommandArgs.getMainAction();

switch (argsMainAction) {
case CommandAction.RUN_COMMAND:
return getRunProcBuilder(jobConfiguration, baseCommandArgs, args);
return getRunProcBuilder(jobConfiguration, baseCommandArgs);
default:
throw new UnsupportedOperationException(String.format("Main action %s not support in flink engine.", argsMainAction));
}
}

ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration, BaseCommandArgs baseCommandArgs, String[] args) {
ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration,
BaseCommandArgs baseCommandArgs) throws IOException {
FlinkRunCommandArgs flinkCommandArgs = new FlinkRunCommandArgs();
String[] unknownArgs = CommandArgsParser.parseArguments(args, flinkCommandArgs);
CommandArgsParser.parseArguments(baseCommandArgs.getUnknownOptions(), flinkCommandArgs);

BitSailConfiguration sysConfiguration = BitSailSystemConfiguration.loadSysConfiguration();
DeploymentSupplier deploymentSupplier = deploymentSupplierFactory.getDeploymentSupplier(flinkCommandArgs,
jobConfiguration);

ProcessBuilder flinkProcBuilder = new ProcessBuilder();
List<String> flinkCommands = Lists.newArrayList();

String flinkDir = sysConfiguration.getNecessaryOption(FlinkRunnerConfigOptions.FLINK_HOME, CommonErrorCode.CONFIG_ERROR);
LOG.info("Find flink dir = {} in System configuration.", flinkDir);
if (!Files.exists(Paths.get(flinkDir))) {
LOG.error("Flink dir = {} not exists in fact, plz check the system configuration.", flinkDir);
throw new IllegalArgumentException(String.format("Flink dir %s not exists.", flinkDir));
}
flinkCommands.add(flinkDir + "/bin/flink");
flinkCommands.add(flinkCommandArgs.getExecutionMode());
deploymentSupplier.addDeploymentCommands(baseCommandArgs, flinkCommands);
Expand All @@ -95,14 +129,12 @@ ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration, BaseComm
flinkCommandArgs,
flinkCommands);

flinkCommands.add("-D");
flinkCommands.add("execution.attached=" + !baseCommandArgs.isDetach());
if (!baseCommandArgs.isDetach()) {
flinkCommands.add("-sae");
} else {
flinkCommands.add("--detached");
}

flinkCommands.addAll(Arrays.asList(unknownArgs));

if (sysConfiguration.fieldExists(FlinkRunnerConfigOptions.FLINK_DEFAULT_PROPERTIES)) {
for (Map.Entry<String, String> property : sysConfiguration.getFlattenMap(FlinkRunnerConfigOptions.FLINK_DEFAULT_PROPERTIES.key()).entrySet()) {
LOG.info("Add System property {} = {}.", property.getKey(), property.getValue());
Expand All @@ -122,11 +154,14 @@ ProcessBuilder getRunProcBuilder(BitSailConfiguration jobConfiguration, BaseComm
flinkCommands.add(baseCommandArgs.getJobConf());

flinkProcBuilder.command(flinkCommands);

FlinkSecurityHandler.processSecurity(sysConfiguration, flinkProcBuilder, flinkDir);
return flinkProcBuilder;
}

@Override
public String engineName() {
return "flink";
}

}
Loading

0 comments on commit a28ed03

Please sign in to comment.