Skip to content

Commit

Permalink
[Feature][Tool] Add e2e case
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Apr 9, 2024
1 parent 63be624 commit 89826f7
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ public void execute() throws CommandExecuteException, ConfigCheckException {
.entrySet()
.forEach(
pluginTypePluginDiscoveryEntry -> {
pringOptionRulesByPluginTypeAndIdentifier(
printOptionRulesByPluginTypeAndIdentifier(
pluginTypePluginDiscoveryEntry.getValue(),
pluginIdentifier);
});
} else {
pringOptionRulesByPluginTypeAndIdentifier(
printOptionRulesByPluginTypeAndIdentifier(
DISCOVERY_MAP.get(pluginType), pluginIdentifier);
}
}
}

private void pringOptionRulesByPluginTypeAndIdentifier(
private void printOptionRulesByPluginTypeAndIdentifier(
PluginDiscovery DISCOVERY_MAP, String pluginIdentifier) {
ImmutableTriple<PluginIdentifier, List<Option<?>>, List<Option<?>>> triple =
DISCOVERY_MAP.getOptionRules(pluginIdentifier);
Expand All @@ -108,11 +108,11 @@ private void pringOptionRulesByPluginTypeAndIdentifier(
private void printSupportedPlugins(
PluginType pluginType, LinkedHashMap<PluginIdentifier, OptionRule> plugins) {
System.out.println(StringUtils.LF + StringUtils.capitalize(pluginType.getType()));
String supportedSinks =
String supportedPlugins =
plugins.keySet().stream()
.map(pluginIdentifier -> pluginIdentifier.getPluginName())
.collect(Collectors.joining(StringUtils.SPACE));
System.out.println(supportedSinks + StringUtils.LF);
System.out.println(supportedPlugins + StringUtils.LF);
}

private void printOptionRules(
Expand Down
12 changes: 12 additions & 0 deletions seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-paimon</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-transforms-v2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,45 @@

package org.apache.seatunnel.core.starter.seatunnel;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceFactory;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.seatunnel.transform.sql.SQLTransformFactory;

import org.apache.commons.lang3.StringUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.testcontainers.containers.Container;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Only support for seatunnel")
@DisabledOnOs(OS.WINDOWS)
@Slf4j
public class SeaTunnelConnectorTest extends TestSuiteBase implements TestResource {

Expand All @@ -60,6 +72,29 @@ public class SeaTunnelConnectorTest extends TestSuiteBase implements TestResourc
}
};

/** All supported transforms. */
private static final Set<String> TRANSFORMS =
new HashSet() {
{
add("Copy");
add("FieldMapper");
add("Filter");
add("FilterRowKind");
add("JsonPath");
add("Replace");
add("Split");
add("Sql");
}
};

// Match paimon source and paimon sink
private static final Pattern pattern1 =
Pattern.compile(
"(Paimon (source|sink))(.*?)(?=(Paimon (source|sink)|$))", Pattern.DOTALL);
// Match required options and optional options
private static final Pattern pattern2 =
Pattern.compile("Required Options:(.*?)(?:Optional Options: (.*?))?$", Pattern.DOTALL);

@Override
public void startUp() throws Exception {}

Expand All @@ -68,49 +103,200 @@ public void tearDown() throws Exception {}

@TestTemplate
public void testExecCheck(TestContainer container) throws Exception {
String[] case1 = {"-l -pt source"};
execCheck(container, case1, PluginType.SOURCE);
String[] case1 = {"-l"};
Container.ExecResult execResult = execCommand(container, case1);
checkResultForCase1(execResult);

String[] case2 = {"-l -pt sink"};
execCheck(container, case2, PluginType.SINK);
String[] case2 = {"-l -pt source"};
execCheck(container, case2, PluginType.SOURCE);

String[] case3 = {"-o Paimon -pt sink"};
Container.ExecResult execResult = container.executeConnectorCheck(case3);
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertTrue(StringUtils.isBlank(execResult.getStderr()));
log.info(execResult.getStdout());
String[] case3 = {"-l -pt sink"};
execCheck(container, case3, PluginType.SINK);

String[] case4 = {"-o Paimon"};
Container.ExecResult execResult4 = execCommand(container, case4);
checkStdOutForOptionRule(execResult4.getStdout());

String[] case5 = {"-o Paimon -pt source"};
Container.ExecResult execResult5 = execCommand(container, case5);
checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(execResult5.getStdout());

String[] case6 = {"-o Paimon -pt sink"};
Container.ExecResult execResult6 = execCommand(container, case6);
checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(execResult6.getStdout());

String[] case7 = {"-o sql -pt transform"};
Container.ExecResult execResult7 = execCommand(container, case7);
checkStdOutForOptionRuleOfSinglePluginTypeWithTransform(
execResult7.getStdout(), new SQLTransformFactory());
}

private static void execCheck(TestContainer container, String[] args, PluginType pluginType)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeConnectorCheck(args);
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertTrue(StringUtils.isBlank(execResult.getStderr()));
log.info(execResult.getStdout());
String pluginTypeStr = pluginType.getType();
Set<String> connectorIdentifier =
ContainerUtil.getConnectorIdentifier("seatunnel", pluginTypeStr).stream()
.filter(connectorIdenf -> !EXCLUDE_CONNECTOR.contains(connectorIdenf))
private void checkStdOutForOptionRule(String stdout) {
Matcher matcher1 = pattern1.matcher(stdout.trim());
String paimonSourceContent = StringUtils.EMPTY;
String paimonSinkContent = StringUtils.EMPTY;
Assertions.assertTrue(matcher1.groupCount() >= 3);
while (matcher1.find()) {
String type = matcher1.group(2).trim();
if (type.equals(PluginType.SOURCE.getType())) {
paimonSourceContent = matcher1.group(3).trim();
}
if (type.equals(PluginType.SINK.getType())) {
paimonSinkContent = matcher1.group(3).trim();
}
}
Assertions.assertTrue(StringUtils.isNoneBlank(paimonSourceContent));
Assertions.assertTrue(StringUtils.isNoneBlank(paimonSinkContent));
checkOptionRuleOfSinglePluginType(new PaimonSourceFactory(), paimonSourceContent);
checkOptionRuleOfSinglePluginType(new PaimonSinkFactory(), paimonSinkContent);
}

private void checkStdOutForOptionRuleOfSinglePluginTypeWithTransform(
String stdout, Factory factory) {
Matcher matcher2 = pattern2.matcher(stdout);
Assertions.assertTrue(matcher2.find());
Assertions.assertTrue(matcher2.groupCount() >= 2);
OptionRule optionRule = factory.optionRule();
List<Option<?>> exceptRequiredOptions =
optionRule.getRequiredOptions().stream()
.flatMap(requiredOption -> requiredOption.getOptions().stream())
.collect(Collectors.toList());
String requiredOptions = matcher2.group(1).trim();
String optionalOptions = matcher2.group(2);
Assertions.assertEquals(
exceptRequiredOptions.size(), requiredOptions.split(StringUtils.LF).length);
Assertions.assertEquals(
optionRule.getOptionalOptions().size(),
StringUtils.isBlank(optionalOptions)
? 0
: optionalOptions.split(StringUtils.LF).length);
}

private void checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(String stdout) {
Matcher matcher1 = pattern1.matcher(stdout.trim());
Assertions.assertTrue(matcher1.find());
Assertions.assertTrue(matcher1.groupCount() >= 3);
String paimonPluginContent = matcher1.group(3).trim();
Assertions.assertTrue(StringUtils.isNoneBlank(paimonPluginContent));
String type = matcher1.group(2).trim();
if (type.equals(PluginType.SOURCE.getType())) {
checkOptionRuleOfSinglePluginType(new PaimonSourceFactory(), paimonPluginContent);
} else if (type.equals(PluginType.SINK.getType())) {
checkOptionRuleOfSinglePluginType(new PaimonSinkFactory(), paimonPluginContent);
}
}

private void checkOptionRuleOfSinglePluginType(Factory factory, String optionRules) {
Matcher matcher2 = pattern2.matcher(optionRules);
Assertions.assertTrue(matcher2.find());
Assertions.assertTrue(matcher2.groupCount() >= 2);
String requiredOptions = matcher2.group(1).trim();
String optionalOptions = matcher2.group(2).trim();
Assertions.assertTrue(StringUtils.isNoneBlank(requiredOptions));
Assertions.assertTrue(StringUtils.isNoneBlank(optionalOptions));
OptionRule optionRule = factory.optionRule();
List<Option<?>> exceptRequiredOptions =
optionRule.getRequiredOptions().stream()
.flatMap(requiredOption -> requiredOption.getOptions().stream())
.collect(Collectors.toList());
Assertions.assertEquals(
exceptRequiredOptions.size(), requiredOptions.split(StringUtils.LF).length);
Assertions.assertEquals(
optionRule.getOptionalOptions().size(),
StringUtils.isBlank(optionalOptions)
? 0
: optionalOptions.split(StringUtils.LF).length);
}

private void checkResultForCase1(Container.ExecResult execResult) {
String[] lines = execResult.getStdout().trim().split(StringUtils.LF);
String sourcesStr = StringUtils.EMPTY;
String sinkStr = StringUtils.EMPTY;
String transformStr = StringUtils.EMPTY;
for (int i = 0; i < lines.length; i++) {
// 如果是"Source",则将后面的内容作为source字符串
if (lines[i].equalsIgnoreCase(PluginType.SOURCE.getType())) {
sourcesStr =
StringUtils.capitalize(PluginType.SOURCE.getType())
+ StringUtils.LF
+ lines[i + 1];
} else if (lines[i].equalsIgnoreCase(PluginType.SINK.getType())) {
sinkStr =
StringUtils.capitalize(PluginType.SINK.getType())
+ StringUtils.LF
+ lines[i + 1];
} else if (lines[i].equalsIgnoreCase(PluginType.TRANSFORM.getType())) {
transformStr =
StringUtils.capitalize(PluginType.TRANSFORM.getType())
+ StringUtils.LF
+ lines[i + 1];
}
}
Assertions.assertTrue(StringUtils.isNoneBlank(sourcesStr));
Assertions.assertTrue(StringUtils.isNoneBlank(sinkStr));
Assertions.assertTrue(StringUtils.isNoneBlank(transformStr));
checkStdOutForSinglePluginTypeOfConnector(PluginType.SOURCE, sourcesStr);
checkStdOutForSinglePluginTypeOfConnector(PluginType.SINK, sinkStr);
checkStdOutForSinglePluginTypeOfTransform(PluginType.TRANSFORM, transformStr);
}

private void checkStdOutForSinglePluginTypeOfTransform(PluginType pluginType, String stdOut) {
Set<String> transforms = getPluginIdentifiers(pluginType, stdOut);
Assertions.assertTrue(!transforms.isEmpty());
Set<String> diff =
TRANSFORMS.stream()
.filter(
connectorIdentifierStr ->
!transforms.contains(connectorIdentifierStr.toLowerCase()))
.collect(Collectors.toSet());
Set<String> connectors =
Assertions.assertTrue(diff.isEmpty());
}

private Set<String> getPluginIdentifiers(PluginType pluginType, String stdOut) {
Set<String> transforms =
new TreeSet<>(
Arrays.asList(
execResult
.getStdout()
.trim()
stdOut.trim()
.replaceFirst(
StringUtils.capitalize(pluginTypeStr),
StringUtils.capitalize(pluginType.getType()),
StringUtils.EMPTY)
.trim()
.toLowerCase()
.split(StringUtils.SPACE)));
return transforms;
}

private Container.ExecResult execCommand(TestContainer container, String[] case1)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeConnectorCheck(case1);
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertTrue(StringUtils.isBlank(execResult.getStderr()));
log.info(execResult.getStdout());
return execResult;
}

private void execCheck(TestContainer container, String[] args, PluginType pluginType)
throws IOException, InterruptedException {
Container.ExecResult execResult = execCommand(container, args);
checkStdOutForSinglePluginTypeOfConnector(pluginType, execResult.getStdout());
}

private void checkStdOutForSinglePluginTypeOfConnector(PluginType pluginType, String stdOut) {
Set<String> connectorIdentifier =
ContainerUtil.getConnectorIdentifier("seatunnel", pluginType.getType()).stream()
.filter(connectorIdenf -> !EXCLUDE_CONNECTOR.contains(connectorIdenf))
.collect(Collectors.toSet());
Set<String> connectors = getPluginIdentifiers(pluginType, stdOut);
Assertions.assertTrue(!connectors.isEmpty());
// check size
Assertions.assertEquals(connectorIdentifier.size(), connectors.size());
Set<String> diff =
connectorIdentifier.stream()
.filter(
connectorIdentifierStr ->
!connectors.contains(connectorIdentifierStr.toLowerCase()))
.collect(Collectors.toSet());
// check equals
Assertions.assertTrue(diff.isEmpty());
}
}

0 comments on commit 89826f7

Please sign in to comment.