Skip to content

Commit

Permalink
[Test][E2E] chunjun-e2e complete postgre tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
chaozwn authored and FlechazoW committed Aug 25, 2022
1 parent d9a616d commit 47dfb61
Show file tree
Hide file tree
Showing 12 changed files with 551 additions and 27 deletions.
35 changes: 35 additions & 0 deletions chunjun-connectors/chunjun-connector-postgresql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,41 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>log4j:log4j</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shade.core.com.google.common</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
Expand Down
7 changes: 7 additions & 0 deletions chunjun-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.19</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class FlinkStandaloneContainer extends GenericContainer<FlinkStandaloneCo

public FlinkStandaloneContainer(String imageName) throws URISyntaxException {
super(
new ImageFromDockerfile(imageName, false)
new ImageFromDockerfile(imageName, true)
.withDockerfile(Paths.get(FLINK_STANDALONE_DOCKERFILE.toURI())));
waitingFor(
new WaitStrategy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class MysqlBaseContainer extends JdbcDatabaseContainer {
public static final Integer MYSQL_PORT = 3306;

public MysqlBaseContainer(String imageName, Path dockerfile) throws URISyntaxException {
super(new ImageFromDockerfile(imageName, false).withDockerfile(dockerfile));
super(new ImageFromDockerfile(imageName, true).withDockerfile(dockerfile));
withEnv("MYSQL_USER", "admin");
withEnv("MYSQL_PASSWORD", password);
withEnv("MYSQL_ROOT_PASSWORD", password);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.containers.postgre;

import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import org.testcontainers.images.builder.ImageFromDockerfile;

import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.Duration;

public class PostgreContainer extends JdbcDatabaseContainer {
private static final URL POSTGRE_DOCKERFILE =
PostgreContainer.class.getClassLoader().getResource("docker/postgre/Dockerfile");

private static final String PG_TEST_USER = "postgre";
private static final String PG_TEST_PASSWORD = "postgre";
private static final String PG_TEST_DATABASE = "postgre";
protected static final String PG_DRIVER_CLASS = "org.postgresql.Driver";
private static final String POSTGRE_HOST = "chunjun-e2e-postgres";
public static final Integer POSTGRESQL_PORT = 5432;

public PostgreContainer() throws URISyntaxException {
super(
new ImageFromDockerfile(POSTGRE_HOST, true)
.withDockerfile(Paths.get(POSTGRE_DOCKERFILE.toURI())));
withExposedPorts(POSTGRESQL_PORT);
waitingFor(
new WaitStrategy() {
@Override
public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {}

@Override
public WaitStrategy withStartupTimeout(Duration startupTimeout) {
return null;
}
});
}

@Override
public String getDriverClassName() {
return PG_DRIVER_CLASS;
}

@Override
public String getJdbcUrl() {
String additionalUrlParams = this.constructUrlParameters("?", "&");
return "jdbc:postgresql://"
+ this.getContainerIpAddress()
+ ":"
+ this.getMappedPort(POSTGRESQL_PORT)
+ "/"
+ PG_TEST_DATABASE
+ additionalUrlParams;
}

@Override
public String getUsername() {
return PG_TEST_USER;
}

@Override
public String getPassword() {
return PG_TEST_PASSWORD;
}

@Override
protected String getTestQueryString() {
return "SELECT 1";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.test.standalone.postgre.sql;

public class PostgreSqlE2eITCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.test.standalone.postgre.sync;

import com.dtstack.chunjun.connector.containers.postgre.PostgreContainer;
import com.dtstack.chunjun.connector.entity.JobAccumulatorResult;
import com.dtstack.chunjun.connector.test.utils.ChunjunFlinkStandaloneTestEnvironment;
import com.dtstack.chunjun.connector.test.utils.JdbcProxy;

import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.apache.commons.lang.StringUtils;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PostgreSyncE2eITCase extends ChunjunFlinkStandaloneTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(PostgreSyncE2eITCase.class);

protected static final String POSTGRE_HOST = "chunjun-e2e-postgre";

private static final URL POSTGRE_INIT_SQL_URL =
PostgreSyncE2eITCase.class.getClassLoader().getResource("docker/postgre/init.sql");

public PostgreContainer postgre;

@Override
public void before() throws Exception {
super.before();
LOG.info("Starting containers...");
postgre = new PostgreContainer();
postgre.withNetwork(NETWORK);
postgre.withNetworkAliases(POSTGRE_HOST);
postgre.withLogConsumer(new Slf4jLogConsumer(LOG));
Startables.deepStart(Stream.of(postgre)).join();
Thread.sleep(5000);
initPostgre();
LOG.info("Containers are started.");
}

@Override
public void after() {
if (postgre != null) {
postgre.stop();
}
super.after();
}

@Test
public void testOracleToOracle() throws Exception {
submitSyncJobOnStandLone(
ChunjunFlinkStandaloneTestEnvironment.CHUNJUN_HOME
+ "/chunjun-examples/json/postgresql/postgre_postgre.json");
JobAccumulatorResult jobAccumulatorResult = waitUntilJobFinished(Duration.ofMinutes(30));

Assert.assertEquals(jobAccumulatorResult.getNumRead(), 9);
Assert.assertEquals(jobAccumulatorResult.getNumWrite(), 9);
JdbcProxy proxy =
new JdbcProxy(
postgre.getJdbcUrl(),
postgre.getUsername(),
postgre.getPassword(),
postgre.getDriverClassName());
List<String> expectResult =
Arrays.asList(
"101,scooter,Small 2-wheel scooter,3.14",
"102,car battery,12V car battery,8.1",
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
"104,hammer,12oz carpenter's hammer,0.75",
"105,hammer,14oz carpenter's hammer,0.875",
"106,hammer,16oz carpenter's hammer,1.0",
"107,rocks,box of assorted rocks,5.3",
"108,jacket,water resistent black wind breaker,0.1",
"109,spare tire,24 inch spare tire,22.2");
proxy.checkResultWithTimeout(
expectResult,
"inventory.products_sink",
new String[] {"id", "name", "description", "weight"},
60000L);
}

private void initPostgre() throws IOException, SQLException {
String initSqls =
FileUtils.readFileToString(new File(POSTGRE_INIT_SQL_URL.getPath()), "UTF-8");
List<String> executeSqls =
Arrays.stream(initSqls.split(";"))
.filter(sql -> StringUtils.isNotEmpty(StringUtils.strip(sql)))
.collect(Collectors.toList());
try (Connection conn = getPgJdbcConnection();
Statement statement = conn.createStatement()) {
for (String sql : executeSqls) {
statement.execute(sql);
}
} catch (SQLException e) {
LOG.error("Execute Oracle init sql failed.", e);
throw e;
}
}

private Connection getPgJdbcConnection() throws SQLException {
return DriverManager.getConnection(
postgre.getJdbcUrl(), postgre.getUsername(), postgre.getPassword());
}
}
3 changes: 2 additions & 1 deletion chunjun-e2e/src/test/resources/docker/oracle/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ FROM rohitbasu77/oracle11g:latest

LABEL maintainer="www.dtstack.com"


ENV LANG=C.UTF-8
ENV TZ=Asia/Shanghai
EXPOSE 1521
EXPOSE 22

Loading

0 comments on commit 47dfb61

Please sign in to comment.