Skip to content

Commit

Permalink
[Feature][Tool] Add e2e case and remove the code of print to the command
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Apr 8, 2024
1 parent ba56054 commit 19cf30c
Show file tree
Hide file tree
Showing 16 changed files with 367 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class CosFileSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
return FileSystemType.OSS.getFileSystemPluginName();
return FileSystemType.COS.getFileSystemPluginName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

setlocal enabledelayedexpansion

REM resolve links - %0 may be a softlink
for %%F in ("%~f0") do (
set "PRG=%%~fF"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,35 @@

package org.apache.seatunnel.core.starter.seatunnel.command;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import org.apache.seatunnel.core.starter.seatunnel.args.ConnectorCheckCommandArgs;
import org.apache.seatunnel.plugin.discovery.PluginDiscovery;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class ConnectorCheckCommand implements Command<ConnectorCheckCommandArgs> {
private static final String OPTION_DESCRIPTION_FORMAT = ", Description: '%s'";

private static final String REQUIRED_OPTION_FORMAT = "Required Options: \n %s";

private static final String OPTIONAL_OPTION_FORMAT = "Optional Options: \n %s";

private static final Map<PluginType, PluginDiscovery> DISCOVERY_MAP = new HashMap();
private ConnectorCheckCommandArgs connectorCheckCommandArgs;
Expand All @@ -48,15 +60,20 @@ public ConnectorCheckCommand(ConnectorCheckCommandArgs connectorCheckCommandArgs
@Override
public void execute() throws CommandExecuteException, ConfigCheckException {
PluginType pluginType = connectorCheckCommandArgs.getPluginType();

// Print plugins(connectors and transforms)
if (connectorCheckCommandArgs.isListConnectors()) {
if (Objects.isNull(pluginType)) {
DISCOVERY_MAP
.values()
.forEach(pluginDiscovery -> pluginDiscovery.printSupportedPlugins());
.entrySet()
.forEach(
pluginTypePluginDiscoveryEntry ->
printSupportedPlugins(
pluginTypePluginDiscoveryEntry.getKey(),
pluginTypePluginDiscoveryEntry
.getValue()
.getPlugins()));
} else {
DISCOVERY_MAP.get(pluginType).printSupportedPlugins();
printSupportedPlugins(pluginType, DISCOVERY_MAP.get(pluginType).getPlugins());
}
}

Expand All @@ -65,13 +82,65 @@ public void execute() throws CommandExecuteException, ConfigCheckException {
if (StringUtils.isNoneBlank(pluginIdentifier)) {
if (Objects.isNull(pluginType)) {
DISCOVERY_MAP
.values()
.entrySet()
.forEach(
pluginDiscovery ->
pluginDiscovery.printOptionRules(pluginIdentifier));
pluginTypePluginDiscoveryEntry -> {
pringOptionRulesByPluginTypeAndIdentifier(
pluginTypePluginDiscoveryEntry.getValue(),
pluginIdentifier);
});
} else {
DISCOVERY_MAP.get(pluginType).printOptionRules(pluginIdentifier);
pringOptionRulesByPluginTypeAndIdentifier(
DISCOVERY_MAP.get(pluginType), pluginIdentifier);
}
}
}

private void pringOptionRulesByPluginTypeAndIdentifier(
PluginDiscovery DISCOVERY_MAP, String pluginIdentifier) {
ImmutableTriple<PluginIdentifier, List<Option<?>>, List<Option<?>>> triple =
DISCOVERY_MAP.getOptionRules(pluginIdentifier);
if (Objects.nonNull(triple.getLeft())) {
printOptionRules(triple.getLeft(), triple.getMiddle(), triple.getRight());
}
}

private void printSupportedPlugins(
PluginType pluginType, LinkedHashMap<PluginIdentifier, OptionRule> plugins) {
System.out.println(StringUtils.LF + StringUtils.capitalize(pluginType.getType()));
String supportedSinks =
plugins.keySet().stream()
.map(pluginIdentifier -> pluginIdentifier.getPluginName())
.collect(Collectors.joining(StringUtils.SPACE));
System.out.println(supportedSinks + StringUtils.LF);
}

private void printOptionRules(
PluginIdentifier pluginIdentifier,
List<Option<?>> requiredOptions,
List<Option<?>> optionOptions) {
System.out.println(
StringUtils.LF
+ pluginIdentifier.getPluginName()
+ StringUtils.SPACE
+ pluginIdentifier.getPluginType());
System.out.println(
String.format(REQUIRED_OPTION_FORMAT, getOptionRulesString(requiredOptions)));
System.out.println(
String.format(OPTIONAL_OPTION_FORMAT, getOptionRulesString(optionOptions)));
}

private static String getOptionRulesString(List<Option<?>> requiredOptions) {
String requiredOptionsString =
requiredOptions.stream()
.map(
option ->
String.format(
option.toString()
+ OPTION_DESCRIPTION_FORMAT,
option.getDescription())
+ StringUtils.LF)
.collect(Collectors.joining(StringUtils.SPACE));
return requiredOptionsString;
}
}
1 change: 1 addition & 0 deletions seatunnel-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<module>seatunnel-connector-v2-e2e</module>
<module>seatunnel-engine-e2e</module>
<module>seatunnel-transforms-v2-e2e</module>
<module>seatunnel-core-e2e</module>
</modules>

<properties>
Expand Down
29 changes: 29 additions & 0 deletions seatunnel-e2e/seatunnel-core-e2e/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-e2e</artifactId>
<version>${revision}</version>
</parent>

<artifactId>seatunnel-core-e2e</artifactId>
<packaging>pom</packaging>
<name>SeaTunnel : E2E : Core :</name>

<modules>
<module>seatunnel-starter-e2e</module>
</modules>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-e2e-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

</project>
23 changes: 23 additions & 0 deletions seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-core-e2e</artifactId>
<version>${revision}</version>
</parent>

<artifactId>seatunnel-starter-e2e</artifactId>
<name>SeaTunnel : E2E : Core : Starter</name>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-starter</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.seatunnel;

import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.apache.commons.lang3.StringUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Only support for seatunnel")
@Slf4j
public class SeaTunnelConnectorTest extends TestSuiteBase implements TestResource {

/**
* Connectors that do not implement the Factory interface should be excluded because they cannot
* be discovered by seatunnel-plugin-discovery todo: If these connectors implement the Factory
* interface in the future, it should be removed from here
*/
private static final Set<String> EXCLUDE_CONNECTOR =
new HashSet() {
{
add("TDengine");
add("SelectDBCloud");
}
};

@Override
public void startUp() throws Exception {}

@Override
public void tearDown() throws Exception {}

@TestTemplate
public void testExecCheck(TestContainer container) throws Exception {
String[] case1 = {"-l -pt source"};
execCheck(container, case1, PluginType.SOURCE);

String[] case2 = {"-l -pt sink"};
execCheck(container, case2, PluginType.SINK);

String[] case3 = {"-o Paimon -pt sink"};
Container.ExecResult execResult = container.executeConnectorCheck(case3);
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertTrue(StringUtils.isBlank(execResult.getStderr()));
log.info(execResult.getStdout());
}

private static void execCheck(TestContainer container, String[] args, PluginType pluginType)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeConnectorCheck(args);
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertTrue(StringUtils.isBlank(execResult.getStderr()));
log.info(execResult.getStdout());
String pluginTypeStr = pluginType.getType();
Set<String> connectorIdentifier =
ContainerUtil.getConnectorIdentifier("seatunnel", pluginTypeStr).stream()
.filter(connectorIdenf -> !EXCLUDE_CONNECTOR.contains(connectorIdenf))
.collect(Collectors.toSet());
Set<String> connectors =
new TreeSet<>(
Arrays.asList(
execResult
.getStdout()
.trim()
.replaceFirst(
StringUtils.capitalize(pluginTypeStr),
StringUtils.EMPTY)
.trim()
.toLowerCase()
.split(StringUtils.SPACE)));
Assertions.assertEquals(connectorIdentifier.size(), connectors.size());
Set<String> diff =
connectorIdentifier.stream()
.filter(
connectorIdentifierStr ->
!connectors.contains(connectorIdentifierStr.toLowerCase()))
.collect(Collectors.toSet());
Assertions.assertTrue(diff.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ void executeExtraCommands(ContainerExtendedFactory extendedFactory)

Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException;

default Container.ExecResult executeConnectorCheck(String[] args)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not implemented");
};

default Container.ExecResult savepointJob(String jobId)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -56,6 +58,8 @@
import java.util.stream.Collectors;

import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.adaptPathForWin;
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.copyAllConnectorJarToContainer;

@NoArgsConstructor
@Slf4j
Expand All @@ -65,6 +69,7 @@ public class SeaTunnelContainer extends AbstractTestContainer {
protected static final String JDK_DOCKER_IMAGE = "openjdk:8";
private static final String CLIENT_SHELL = "seatunnel.sh";
protected static final String SERVER_SHELL = "seatunnel-cluster.sh";
protected static final String CONNECTOR_CHECK_SHELL = "seatunnel-connector.sh";
protected GenericContainer<?> server;
private final AtomicInteger runningCount = new AtomicInteger();

Expand Down Expand Up @@ -166,6 +171,23 @@ public void executeExtraCommands(ContainerExtendedFactory extendedFactory)
extendedFactory.extend(server);
}

@Override
public Container.ExecResult executeConnectorCheck(String[] args)
throws IOException, InterruptedException {
// copy all connectors
copyAllConnectorJarToContainer(
server,
getConnectorModulePath(),
getConnectorNamePrefix(),
getConnectorType(),
SEATUNNEL_HOME);
final List<String> command = new ArrayList<>();
String binPath = Paths.get(SEATUNNEL_HOME, "bin", CONNECTOR_CHECK_SHELL).toString();
command.add(adaptPathForWin(binPath));
Arrays.stream(args).forEach(arg -> command.add(arg));
return executeCommand(server, command);
}

@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
Expand Down
Loading

0 comments on commit 19cf30c

Please sign in to comment.