Skip to content

Commit

Permalink
[Improve][E2E] Reactor jdbc e2e with new api.
Browse files Browse the repository at this point in the history
  • Loading branch information
FlechazoW committed Feb 21, 2023
1 parent 49d9172 commit c2c4767
Show file tree
Hide file tree
Showing 18 changed files with 1,375 additions and 984 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<p3c-pmd.version>1.3.0</p3c-pmd.version>
<maven-scm-provider-jgit.version>1.9.5</maven-scm-provider-jgit.version>
<testcontainer.version>1.17.3</testcontainer.version>
<testcontainer.version>1.17.6</testcontainer.version>
<spotless.version>2.29.0</spotless.version>
<!-- Option args -->
<skipUT>false</skipUT>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-xe</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>

<!-- drivers -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
Expand All @@ -26,17 +27,15 @@
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand All @@ -48,8 +47,11 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.given;
Expand All @@ -58,137 +60,174 @@
public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResource {

protected static final String HOST = "HOST";
protected GenericContainer<?> dbServer;
protected JdbcCase jdbcCase;

abstract JdbcCase getJdbcCase();

abstract void compareResult() throws SQLException, IOException;

abstract void clearSinkTable();

abstract SeaTunnelRow initTestData();

protected Connection createAndChangeDatabase(Connection connection) {
// do nothing
return connection;
}

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
protected final ContainerExtendedFactory extendedFactory =
container -> {
Container.ExecResult extraCommands =
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/jars && cd /tmp/jars && curl -O "
+ jdbcCase.getDriverJar());
Assertions.assertEquals(0, extraCommands.getExitCode());
"mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ driverUrl());
Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
};

private void getContainer()
throws SQLException, MalformedURLException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
jdbcCase = this.getJdbcCase();
dbServer =
new GenericContainer<>(jdbcCase.getDockerImage())
.withNetwork(NETWORK)
.withNetworkAliases(jdbcCase.getNetworkAliases())
.withEnv(jdbcCase.getContainerEnv())
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(jdbcCase.getDockerImage())));
dbServer.setPortBindings(
Lists.newArrayList(
String.format("%s:%s", jdbcCase.getLocalPort(), jdbcCase.getPort())));
Startables.deepStart(Stream.of(dbServer)).join();
protected GenericContainer<?> dbServer;
protected JdbcCase jdbcCase;
protected Connection connection;

given().ignoreExceptions()
.await()
.atMost(360, TimeUnit.SECONDS)
.untilAsserted(
() -> {
this.initializeJdbcConnection(jdbcCase.getJdbcUrl());
});
this.initializeJdbcTable();
}
abstract JdbcCase getJdbcCase();

abstract void compareResult() throws SQLException, IOException;

protected Connection initializeJdbcConnection(String jdbcUrl)
abstract String driverUrl();

abstract Pair<String[], List<SeaTunnelRow>> initTestData();

abstract GenericContainer<?> initContainer();

protected void initializeJdbcConnection(String jdbcUrl)
throws SQLException, ClassNotFoundException, MalformedURLException,
InstantiationException, IllegalAccessException {
URLClassLoader urlClassLoader =
new URLClassLoader(
new URL[] {new URL(jdbcCase.getDriverJar())},
AbstractJdbcIT.class.getClassLoader());
new URL[] {new URL(driverUrl())}, AbstractJdbcIT.class.getClassLoader());
Thread.currentThread().setContextClassLoader(urlClassLoader);
Driver driver = (Driver) urlClassLoader.loadClass(jdbcCase.getDriverClass()).newInstance();
Properties props = new Properties();
props.put("user", jdbcCase.getUserName());
props.put("password", jdbcCase.getPassword());
return driver.connect(jdbcUrl.replace(HOST, dbServer.getHost()), props);
this.connection = driver.connect(jdbcUrl.replace(HOST, dbServer.getHost()), props);
connection.setAutoCommit(false);
}

private void batchInsertData() {
try (Connection connection =
initializeJdbcConnection(
String.format(
jdbcCase.getJdbcTemplate(),
jdbcCase.getLocalPort(),
jdbcCase.getDataBase()))) {
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement =
connection.prepareStatement(jdbcCase.getInitDataSql())) {

for (int index = 0;
index < jdbcCase.getSeaTunnelRow().getFields().length;
index++) {
preparedStatement.setObject(
index + 1, jdbcCase.getSeaTunnelRow().getFields()[index]);
protected void insertTestData() {
try (PreparedStatement preparedStatement =
connection.prepareStatement(jdbcCase.getInsertSql())) {

List<SeaTunnelRow> rows = jdbcCase.getTestData().getValue();

for (SeaTunnelRow row : rows) {
for (int index = 0; index < row.getArity(); index++) {
preparedStatement.setObject(index + 1, row.getField(index));
}
preparedStatement.execute();
preparedStatement.addBatch();
}

preparedStatement.executeBatch();

connection.commit();
} catch (Exception exception) {
log.error(ExceptionUtils.getMessage(exception));
throw new RuntimeException("get connection error", exception);
throw new SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
}
}

private void initializeJdbcTable() {
try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) {
Connection newConnection = createAndChangeDatabase(connection);
Statement statement = newConnection.createStatement();
String createSource = jdbcCase.getDdlSource();
String createSink = jdbcCase.getDdlSink();
protected void createSchemaIfNeeded() {}

protected void createNeededTables() {
try (Statement statement = connection.createStatement()) {
String createTemplate = jdbcCase.getCreateSql();

String createSource =
String.format(
createTemplate,
buildTableInfoWithSchema(
jdbcCase.getDatabase(), jdbcCase.getSourceTable()));
String createSink =
String.format(
createTemplate,
buildTableInfoWithSchema(
jdbcCase.getDatabase(), jdbcCase.getSinkTable()));

statement.execute(createSource);
if (StringUtils.isNotEmpty(createSink)) {
statement.execute(createSink);
}
statement.execute(createSink);

connection.commit();
} catch (Exception exception) {
log.error(ExceptionUtils.getMessage(exception));
throw new RuntimeException("get connection error", exception);
throw new SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
}
}

public String insertTable(String schema, String table, String... fields) {
String columns =
Arrays.stream(fields).map(this::quoteIdentifier).collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fields).map(f -> "?").collect(Collectors.joining(", "));

return "INSERT INTO "
+ buildTableInfoWithSchema(schema, table)
+ " ("
+ columns
+ " )"
+ " VALUES ("
+ placeholders
+ ")";
}

public void clearTable(String schema, String table) {
try (Statement statement = connection.createStatement()) {
statement.execute("TRUNCATE TABLE " + buildTableInfoWithSchema(schema, table));
} catch (SQLException e) {
throw new SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, e);
}
}

/**
* Some rdbms need quote field.
*
* @param field field of rdbms.
* @return quoted field.
*/
public String quoteIdentifier(String field) {
return "`" + field + "`";
}

public String buildTableInfoWithSchema(String schema, String table) {
if (StringUtils.isNotBlank(schema)) {
return quoteIdentifier(schema) + "." + quoteIdentifier(table);
} else {
return quoteIdentifier(table);
}
this.batchInsertData();
}

@BeforeAll
@Override
public void startUp() throws Exception {
this.getContainer();
public void startUp() {
dbServer = initContainer();
jdbcCase = getJdbcCase();

Startables.deepStart(Stream.of(dbServer)).join();

given().ignoreExceptions()
.await()
.atMost(360, TimeUnit.SECONDS)
.untilAsserted(() -> this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));

createSchemaIfNeeded();
createNeededTables();
insertTestData();
}

@Override
public void tearDown() throws Exception {
public void tearDown() throws SQLException {
if (dbServer != null) {
dbServer.close();
}

if (connection != null) {
connection.close();
}
}

@TestTemplate
public void testJdbcDb(TestContainer container)
throws IOException, InterruptedException, SQLException {
Container.ExecResult execResult = container.executeJob(jdbcCase.getConfigFile());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
this.compareResult();

compareResult();
clearTable(jdbcCase.getDatabase(), jdbcCase.getSinkTable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.apache.commons.lang3.tuple.Pair;

import lombok.Builder;
import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
Expand All @@ -35,16 +38,14 @@ public class JdbcCase {
private String password;
private int port;
private int localPort;
private String dataBase;
private String database;
private String sourceTable;
private String sinkTable;
private String driverJar;
private String jdbcTemplate;
private String jdbcUrl;
private String ddlSource;
private String ddlSink;
private String initDataSql;
private String createSql;
private String insertSql;
private String configFile;
private SeaTunnelRow seaTunnelRow;
private Pair<String[], List<SeaTunnelRow>> testData;
private Map<String, String> containerEnv;
}
Loading

0 comments on commit c2c4767

Please sign in to comment.