diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index f8c883cec76..ed6285eae12 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -15,21 +15,21 @@ Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` f ## Options -| name | type | required | default value | -| --- | --- | --- | --- | -| url | String | Yes | - | -| driver | String | Yes | - | -| user | String | No | - | -| password | String | No | - | -| query | String | Yes | - | -| connection_check_timeout_sec | Int | No | 30 | -| max_retries | Int | No | 3 | -| batch_size | Int | No | 300 | -| batch_interval_ms | Int | No | 1000 | -| is_exactly_once | Boolean | No | false | -| xa_data_source_class_name | String | No | - | -| max_commit_attempts | Int | No | 3 | -| transaction_timeout_sec | Int | No | -1 | +| name | type | required | default value | +|------------------------------|---------|----------|---------------| +| url | String | Yes | - | +| driver | String | Yes | - | +| user | String | No | - | +| password | String | No | - | +| query | String | Yes | - | +| connection_check_timeout_sec | Int | No | 30 | +| max_retries | Int | No | 3 | +| batch_size | Int | No | 300 | +| batch_interval_ms | Int | No | 1000 | +| is_exactly_once | Boolean | No | false | +| xa_data_source_class_name | String | No | - | +| max_commit_attempts | Int | No | 3 | +| transaction_timeout_sec | Int | No | -1 | ### driver [string] The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver. @@ -64,7 +64,7 @@ For batch writing, when the number of buffers reaches the number of `batch_size` Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to set `xa_data_source_class_name`. ### xa_data_source_class_name[string] -The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource` and postgresql is `org.postgresql.xa.PGXADataSource` +The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource`, and please refer to appendix for other data sources ### max_commit_attempts[int] The number of retries for transaction commit failures @@ -76,6 +76,15 @@ The timeout after the transaction is opened, the default is -1 (never timeout). In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup. For example, postgres needs to set `max_prepared_transactions > 1` Such as `ALTER SYSTEM set max_prepared_transactions to 10`. +## appendix +there are some reference value for params above. + +| datasource | driver | url | xa_data_source_class_name | maven | +|------------|--------------------------|-------------------------------------------|-------------------------------------|---------------------------------------------------------------| +| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | | +| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | + ## Example Simple ``` diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 5f1e47ac98e..ca229ee8291 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -20,17 +20,17 @@ supports query SQL and can achieve projection effect. ## Options -| name | type | required | default value | -| --- | --- | --- | --- | -| url | String | Yes | - | -| driver | String | Yes | - | -| user | String | No | - | -| password | String | No | - | -| query | String | Yes | - | -| connection_check_timeout_sec | Int | No | 30 | -| partition_column | String | No | - | -| partition_upper_bound | Long | No | - | -| partition_lower_bound | Long | No | - | +| name | type | required | default value | +|------------------------------|--------|----------|---------------| +| url | String | Yes | - | +| driver | String | Yes | - | +| user | String | No | - | +| password | String | No | - | +| query | String | Yes | - | +| connection_check_timeout_sec | Int | No | 30 | +| partition_column | String | No | - | +| partition_upper_bound | Long | No | - | +| partition_lower_bound | Long | No | - | ### driver [string] The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver. @@ -66,6 +66,16 @@ The partition_column min value for scan, if not set SeaTunnel will query databas ## tips If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## appendix +there are some reference value for params above. + +| datasource | driver | url | maven | +|------------|--------------------------|-------------------------------------------|---------------------------------------------------------------| +| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | | +| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | + ## Example simple: ```Jdbc { diff --git a/pom.xml b/pom.xml index 9beb6a1fe2b..1a904df22f1 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ 1.18.0 8.0.16 42.3.3 + 8.1.2.141 false false false @@ -206,21 +207,6 @@ seatunnel-config-shade ${seatunnel.config.shade.version} - - - - mysql - mysql-connector-java - ${mysql.version} - test - - - - org.postgresql - postgresql - ${postgresql.version} - - commons-codec commons-codec diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index eb4e1b37d4c..68b6ad2847b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -31,8 +31,9 @@ 5.2.5-HBase-2.x - 42.3.3 8.0.16 + 42.3.3 + 8.1.2.141 @@ -47,7 +48,12 @@ org.postgresql postgresql - ${pg.version} + ${postgresql.version} + + + com.dameng + DmJdbcDriver18 + ${dm-jdbc.version} @@ -57,4 +63,4 @@ - \ No newline at end of file + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java new file mode 100644 index 00000000000..f13bb1d7a28 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +public class DmdbDialect implements JdbcDialect { + + @Override + public String dialectName() { + return "DM"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new DmdbJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new DmdbTypeMapper(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java new file mode 100644 index 00000000000..0578bba3c58 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +/** + * Factory for {@link DmdbDialect}. + */ +@AutoService(JdbcDialectFactory.class) +public class DmdbDialectFactory implements JdbcDialectFactory { + + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:dm:"); + } + + @Override + public JdbcDialect create() { + return new DmdbDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbJdbcRowConverter.java new file mode 100644 index 00000000000..6aa666314e4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbJdbcRowConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class DmdbJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return "DM"; + } + + @Override + public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException { + return super.toInternal(rs, metaData, typeInfo); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbTypeMapper.java new file mode 100644 index 00000000000..a5aea557163 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbTypeMapper.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class DmdbTypeMapper implements JdbcDialectTypeMapper { + + // ============================data types===================== + private static final String DM_BIT = "BIT"; + + // ----------------------------number------------------------- + private static final String DM_NUMERIC = "NUMERIC"; + private static final String DM_NUMBER = "NUMBER"; + private static final String DM_DECIMAL = "DECIMAL"; + /** + * same to DECIMAL + */ + private static final String DM_DEC = "DEC"; + + // ----------------------------int----------------------------- + private static final String DM_INTEGER = "INTEGER"; + private static final String DM_INT = "INT"; + public static final String DM_PLS_INTEGER = "PLS_INTEGER"; + private static final String DM_BIGINT = "BIGINT"; + private static final String DM_TINYINT = "TINYINT"; + private static final String DM_BYTE = "BYTE"; + private static final String DM_SMALLINT = "SMALLINT"; + + // dm float is double for Cpp. + private static final String DM_FLOAT = "FLOAT"; + private static final String DM_DOUBLE = "DOUBLE"; + private static final String DM_DOUBLE_PRECISION = "DOUBLE PRECISION"; + private static final String DM_REAL = "REAL"; + + // DM_CHAR DM_CHARACTER DM_VARCHAR DM_VARCHAR2 max is 32767 + private static final String DM_CHAR = "CHAR"; + private static final String DM_CHARACTER = "CHARACTER"; + private static final String DM_VARCHAR = "VARCHAR"; + private static final String DM_VARCHAR2 = "VARCHAR2"; + private static final String DM_LONGVARCHAR = "LONGVARCHAR"; + private static final String DM_CLOB = "CLOB"; + private static final String DM_TEXT = "TEXT"; + private static final String DM_LONG = "LONG"; + + // ------------------------------time------------------------- + private static final String DM_DATE = "DATE"; + private static final String DM_TIME = "TIME"; + private static final String DM_TIMESTAMP = "TIMESTAMP"; + private static final String DM_DATETIME = "DATETIME"; + + // ---------------------------binary--------------------------- + private static final String DM_BINARY = "BINARY"; + private static final String DM_VARBINARY = "VARBINARY"; + + // -------------------------time interval----------------------- + private static final String DM_INTERVAL_YEAR_TO_MONTH = "INTERVAL YEAR TO MONTH"; + private static final String DM_INTERVAL_YEAR = "INTERVAL YEAR"; + private static final String DM_INTERVAL_MONTH = "INTERVAL MONTH"; + private static final String DM_INTERVAL_DAY = "INTERVAL DAY"; + private static final String DM_INTERVAL_DAY_TO_HOUR = "INTERVAL DAY TO HOUR"; + private static final String DM_INTERVAL_DAY_TO_MINUTE = "INTERVAL DAY TO MINUTE"; + private static final String DM_INTERVAL_DAY_TO_SECOND = "INTERVAL DAY TO SECOND"; + private static final String DM_INTERVAL_HOUR = "INTERVAL HOUR"; + private static final String DM_INTERVAL_HOUR_TO_MINUTE = "INTERVAL HOUR TO MINUTE"; + private static final String DM_INTERVAL_HOUR_TO_SECOND = "INTERVAL HOUR TO SECOND"; + private static final String DM_INTERVAL_MINUTE = "INTERVAL MINUTE"; + private static final String DM_INTERVAL_MINUTE_TO_SECOND = "INTERVAL MINUTE TO SECOND"; + private static final String DM_INTERVAL_SECOND = "INTERVAL SECOND"; + // time zone + private static final String DM_TIME_WITH_TIME_ZONE = "TIME WITH TIME ZONE"; + private static final String DM_TIMESTAMP_WITH_TIME_ZONE = "TIMESTAMP WITH TIME ZONE"; + private static final String TIMESTAMP_WITH_LOCAL_TIME_ZONE = "TIMESTAMP WITH LOCAL TIME ZONE"; + + // ------------------------------blob------------------------- + public static final String DM_BLOB = "BLOB"; + public static final String DM_BFILE = "BFILE"; + public static final String DM_IMAGE = "IMAGE"; + public static final String DM_LONGVARBINARY = "LONGVARBINARY"; + + @Override + @SuppressWarnings("checkstyle:MagicNumber") + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { + String dmdbType = metadata.getColumnTypeName(colIndex).toUpperCase(); + int precision = metadata.getPrecision(colIndex); + switch (dmdbType) { + case DM_BIT: + return BasicType.BOOLEAN_TYPE; + + case DM_INT: + case DM_INTEGER: + case DM_PLS_INTEGER: + return BasicType.INT_TYPE; + + case DM_TINYINT: + case DM_BYTE: + return BasicType.BYTE_TYPE; + + case DM_SMALLINT: + return BasicType.SHORT_TYPE; + + case DM_BIGINT: + return BasicType.LONG_TYPE; + + case DM_NUMERIC: + case DM_NUMBER: + case DM_DECIMAL: + case DM_DEC: + if (precision > 0) { + return new DecimalType(precision, metadata.getScale(colIndex)); + } + return new DecimalType(38, 18); + + case DM_REAL: + return BasicType.FLOAT_TYPE; + + case DM_FLOAT: + case DM_DOUBLE_PRECISION: + case DM_DOUBLE: + return BasicType.DOUBLE_TYPE; + + case DM_CHAR: + case DM_CHARACTER: + case DM_VARCHAR: + case DM_VARCHAR2: + // 100G-1 byte + case DM_TEXT: + case DM_LONG: + case DM_LONGVARCHAR: + case DM_CLOB: + return BasicType.STRING_TYPE; + + case DM_TIMESTAMP: + case DM_DATETIME: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + + case DM_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + + case DM_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + + // 100G-1 byte + case DM_BLOB: + case DM_BINARY: + case DM_VARBINARY: + case DM_LONGVARBINARY: + case DM_IMAGE: + case DM_BFILE: + return PrimitiveByteArrayType.INSTANCE; + + //Doesn't support yet + case DM_INTERVAL_YEAR_TO_MONTH: + case DM_INTERVAL_YEAR: + case DM_INTERVAL_MONTH: + case DM_INTERVAL_DAY: + case DM_INTERVAL_DAY_TO_HOUR: + case DM_INTERVAL_DAY_TO_MINUTE: + case DM_INTERVAL_DAY_TO_SECOND: + case DM_INTERVAL_HOUR: + case DM_INTERVAL_HOUR_TO_MINUTE: + case DM_INTERVAL_HOUR_TO_SECOND: + case DM_INTERVAL_MINUTE: + case DM_INTERVAL_MINUTE_TO_SECOND: + case DM_INTERVAL_SECOND: + case DM_TIME_WITH_TIME_ZONE: + case DM_TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + default: + final String jdbcColumnName = metadata.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format("Doesn't support Dmdb type '%s' on column '%s' yet.", dmdbType, jdbcColumnName)); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml index e4361c3592e..4b4d1d9981e 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml @@ -46,6 +46,22 @@ 1.17.3 test + + mysql + mysql-connector-java + ${mysql.version} + provided + + + org.postgresql + postgresql + ${postgresql.version} + + + com.dameng + DmJdbcDriver18 + ${dm-jdbc.version} + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java new file mode 100644 index 00000000000..b88aab09350 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.jdbc; + +import static org.testcontainers.shaded.org.awaitility.Awaitility.given; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class JdbcDmdbIT extends FlinkContainer { + + private static final String DOCKER_IMAGE = "laglangyue/dmdb8"; + private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver"; + private static final String HOST = "flink_e2e_dmdb"; + private static final String LOCAL_HOST = "localhost"; + private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236"; + private static final String USERNAME = "SYSDBA"; + private static final String PASSWORD = "SYSDBA"; + private static final String DATABASE = "SYSDBA"; + private static final String SOURCE_TABLE = "e2e_table_source"; + private static final String SINK_TABLE = "e2e_table_sink"; + private Connection jdbcConnection; + private GenericContainer dbServer; + + @BeforeEach + public void startDmdbContainer() throws ClassNotFoundException, SQLException { + dbServer = new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + dbServer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", 5236, 5236))); + Startables.deepStart(Stream.of(dbServer)).join(); + log.info("Dmdb container started"); + // wait for Dmdb fully start + Class.forName(DRIVER_CLASS); + given().ignoreExceptions() + .await() + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initializeJdbcConnection); + initializeJdbcTable(); + } + + private void initializeJdbcConnection() throws SQLException { + jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD); + } + + /** + * init the table for DM_SERVER, DDL and DML for source and sink + */ + private void initializeJdbcTable() { + java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf"); + if (resource == null) { + throw new IllegalArgumentException("can't find find file"); + } + String file = resource.getFile(); + Config config = ConfigFactory.parseFile(new File(file)); + assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink"); + try (Statement statement = jdbcConnection.createStatement()) { + // source + String sourceTableDDL = config.getString("dm_table_source"); + statement.execute(sourceTableDDL); + String insertSQL = config.getString("DML"); + statement.execute(insertSQL); + // sink + String sinkTableDDL = config.getString("dm_table_sink"); + statement.execute(sinkTableDDL); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } + + private void assertHasData(String table) { + try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) { + Statement statement = connection.createStatement(); + String sql = String.format("select * from %s.%s limit 1", DATABASE, table); + ResultSet source = statement.executeQuery(sql); + Assertions.assertTrue(source.next()); + } catch (SQLException e) { + throw new RuntimeException("test dm server image error", e); + } + } + + @AfterEach + public void closeDmdbContainer() throws SQLException { + if (jdbcConnection != null) { + jdbcConnection.close(); + } + if (dbServer != null) { + dbServer.close(); + } + } + + @Test + @DisplayName("JDBC-DM container can be pull") + public void testDMDBImage() { + assertHasData(SOURCE_TABLE); + } + + @Test + @DisplayName("flink JDBC-DM test") + public void testJdbcDmdbSourceAndSink() throws IOException, InterruptedException, SQLException { + assertHasData(SOURCE_TABLE); + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_dm_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + assertHasData(SINK_TABLE); + } + +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf new file mode 100644 index 00000000000..056c252a1d1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +dm_table_source = """ +create table if not exists "SYSDBA".e2e_table_source +( + DM_BIT BIT, + DM_INT INT, + DM_INTEGER INTEGER, + DM_PLS_INTEGER PLS_INTEGER, + DM_TINYINT TINYINT, + + DM_BYTE BYTE, + DM_SMALLINT SMALLINT, + DM_BIGINT BIGINT, + + DM_NUMERIC NUMERIC, + DM_NUMBER NUMBER, + DM_DECIMAL DECIMAL, + DM_DEC DEC, + + DM_REAL REAL, + DM_FLOAT FLOAT, + DM_DOUBLE_PRECISION DOUBLE PRECISION, + DM_DOUBLE DOUBLE, + + DM_CHAR CHAR, + DM_CHARACTER CHARACTER, + DM_VARCHAR VARCHAR, + DM_VARCHAR2 VARCHAR2, + DM_TEXT TEXT, + DM_LONG LONG, + DM_LONGVARCHAR LONGVARCHAR, + DM_CLOB CLOB, + + DM_TIMESTAMP TIMESTAMP, + DM_DATETIME DATETIME, + DM_TIME TIME, + DM_DATE DATE, + + DM_BLOB BLOB, + DM_BINARY BINARY, + DM_VARBINARY VARBINARY, + DM_LONGVARBINARY LONGVARBINARY, + DM_IMAGE IMAGE, + DM_BFILE BFILE +) +""" + +dm_table_sink = """ +create table if not exists "SYSDBA".e2e_table_sink +( + DM_BIT BIT, + DM_INT INT, + DM_INTEGER INTEGER, + DM_PLS_INTEGER PLS_INTEGER, + DM_TINYINT TINYINT, + + DM_BYTE BYTE, + DM_SMALLINT SMALLINT, + DM_BIGINT BIGINT, + + DM_NUMERIC NUMERIC, + DM_NUMBER NUMBER, + DM_DECIMAL DECIMAL, + DM_DEC DEC, + + DM_REAL REAL, + DM_FLOAT FLOAT, + DM_DOUBLE_PRECISION DOUBLE PRECISION, + DM_DOUBLE DOUBLE, + + DM_CHAR CHAR, + DM_CHARACTER CHARACTER, + DM_VARCHAR VARCHAR, + DM_VARCHAR2 VARCHAR2, + DM_TEXT TEXT, + DM_LONG LONG, + DM_LONGVARCHAR LONGVARCHAR, + DM_CLOB CLOB, + + DM_TIMESTAMP TIMESTAMP, + DM_DATETIME DATETIME, + DM_TIME TIME, + DM_DATE DATE, + + DM_BLOB BLOB, + DM_BINARY BINARY, + DM_VARBINARY VARBINARY, + DM_LONGVARBINARY LONGVARBINARY, + DM_IMAGE IMAGE, + DM_BFILE BFILE +) +""" +// only need for source +DML = """ +INSERT INTO "SYSDBA".e2e_table_source ( +DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT, DM_BIGINT, +DM_NUMERIC, DM_NUMBER, DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE, +DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB, +DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE, +DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE) +VALUES +(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', + 'a', 'a', 'a', 'a', 'a', 'a', 'a', +'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '15:45:00', '2022-08-13', +null, null, null, null, null, null) +""" \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf new file mode 100644 index 00000000000..ba2468ccfc5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Jdbc { + url = "jdbc:dm://flink_e2e_dmdb:5236" + driver = "dm.jdbc.driver.DmDriver" + connection_check_timeout_sec = 1000 + user = "SYSDBA" + password = "SYSDBA" + query = """select * from "SYSDBA".e2e_table_source""" + } + +} + +transform { + +} + +sink { + Jdbc { + url = "jdbc:dm://flink_e2e_dmdb:5236" + driver = "dm.jdbc.driver.DmDriver" + connection_check_timeout_sec = 1000 + user = "SYSDBA" + password = "SYSDBA" + query = """ +INSERT INTO SYSDBA.e2e_table_sink (DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT, DM_BIGINT, DM_NUMERIC, DM_NUMBER, + DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE, DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG, + DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE, DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +""" + } +} + diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml index 8d96fb66a38..a5242ef72c4 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml @@ -40,6 +40,23 @@ ${project.version} test + + mysql + mysql-connector-java + ${mysql.version} + provided + + + + org.postgresql + postgresql + ${postgresql.version} + + + com.dameng + DmJdbcDriver18 + ${dm-jdbc.version} + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java new file mode 100644 index 00000000000..d1eb5043d9e --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.jdbc; + +import static org.testcontainers.shaded.org.awaitility.Awaitility.given; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class JdbcDmdbIT extends SparkContainer { + + private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8"; + private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver"; + private static final String HOST = "spark_e2e_dmdb"; + private static final String LOCAL_HOST = "localhost"; + private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236"; + private static final String USERNAME = "SYSDBA"; + private static final String PASSWORD = "SYSDBA"; + private static final String DATABASE = "SYSDBA"; + private static final String SOURCE_TABLE = "e2e_table_source"; + private static final String SINK_TABLE = "e2e_table_sink"; + private GenericContainer dbServer; + private Connection jdbcConnection; + + @BeforeEach + public void beforeAllForDM() { + try { + dbServer = new GenericContainer<>(DM_DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + dbServer.setPortBindings(Lists.newArrayList("5236:5236")); + Startables.deepStart(Stream.of(dbServer)).join(); + log.info("dmdb container started"); + Class.forName(DRIVER_CLASS); + given().ignoreExceptions() + .await() + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initializeJdbcConnection); + initializeJdbcTable(); + } catch (Exception ex) { + log.error("dm container init failed", ex); + throw new RuntimeException(ex); + } + } + + @AfterEach + public void closeDmdbContainer() throws SQLException { + if (jdbcConnection != null) { + jdbcConnection.close(); + } + if (dbServer != null) { + dbServer.close(); + } + } + + private void initializeJdbcConnection() throws SQLException { + jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD); + } + + /** + * init the table for DM_SERVER, DDL and DML for source and sink + */ + private void initializeJdbcTable() { + URL resource = JdbcDmdbIT.class.getResource("/jdbc/init_sql/dm_init.conf"); + if (resource == null) { + throw new IllegalArgumentException("can't find find file"); + } + String file = resource.getFile(); + Config config = ConfigFactory.parseFile(new File(file)); + assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink"); + try (Statement statement = jdbcConnection.createStatement()) { + // source + String sourceTableDDL = config.getString("dm_table_source"); + statement.execute(sourceTableDDL); + String insertSQL = config.getString("DML"); + statement.execute(insertSQL); + // sink + String sinkTableDDL = config.getString("dm_table_sink"); + statement.execute(sinkTableDDL); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } + + private void assertHasData(String table) { + try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) { + Statement statement = connection.createStatement(); + String sql = String.format("select * from %s.%s limit 1", DATABASE, table); + ResultSet source = statement.executeQuery(sql); + Assertions.assertTrue(source.next()); + } catch (SQLException e) { + throw new RuntimeException("test dm server image error", e); + } + } + + @Test + @DisplayName("JDBC-DM container can be pull") + public void testDMDBImage() { + assertHasData(SOURCE_TABLE); + } + + @Test + @DisplayName("spark JDBC-DM test for all type mapper") + public void testDMDBSourceToJdbcSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_dm_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + assertHasData(SINK_TABLE); + } + +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf new file mode 100644 index 00000000000..056c252a1d1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +dm_table_source = """ +create table if not exists "SYSDBA".e2e_table_source +( + DM_BIT BIT, + DM_INT INT, + DM_INTEGER INTEGER, + DM_PLS_INTEGER PLS_INTEGER, + DM_TINYINT TINYINT, + + DM_BYTE BYTE, + DM_SMALLINT SMALLINT, + DM_BIGINT BIGINT, + + DM_NUMERIC NUMERIC, + DM_NUMBER NUMBER, + DM_DECIMAL DECIMAL, + DM_DEC DEC, + + DM_REAL REAL, + DM_FLOAT FLOAT, + DM_DOUBLE_PRECISION DOUBLE PRECISION, + DM_DOUBLE DOUBLE, + + DM_CHAR CHAR, + DM_CHARACTER CHARACTER, + DM_VARCHAR VARCHAR, + DM_VARCHAR2 VARCHAR2, + DM_TEXT TEXT, + DM_LONG LONG, + DM_LONGVARCHAR LONGVARCHAR, + DM_CLOB CLOB, + + DM_TIMESTAMP TIMESTAMP, + DM_DATETIME DATETIME, + DM_TIME TIME, + DM_DATE DATE, + + DM_BLOB BLOB, + DM_BINARY BINARY, + DM_VARBINARY VARBINARY, + DM_LONGVARBINARY LONGVARBINARY, + DM_IMAGE IMAGE, + DM_BFILE BFILE +) +""" + +dm_table_sink = """ +create table if not exists "SYSDBA".e2e_table_sink +( + DM_BIT BIT, + DM_INT INT, + DM_INTEGER INTEGER, + DM_PLS_INTEGER PLS_INTEGER, + DM_TINYINT TINYINT, + + DM_BYTE BYTE, + DM_SMALLINT SMALLINT, + DM_BIGINT BIGINT, + + DM_NUMERIC NUMERIC, + DM_NUMBER NUMBER, + DM_DECIMAL DECIMAL, + DM_DEC DEC, + + DM_REAL REAL, + DM_FLOAT FLOAT, + DM_DOUBLE_PRECISION DOUBLE PRECISION, + DM_DOUBLE DOUBLE, + + DM_CHAR CHAR, + DM_CHARACTER CHARACTER, + DM_VARCHAR VARCHAR, + DM_VARCHAR2 VARCHAR2, + DM_TEXT TEXT, + DM_LONG LONG, + DM_LONGVARCHAR LONGVARCHAR, + DM_CLOB CLOB, + + DM_TIMESTAMP TIMESTAMP, + DM_DATETIME DATETIME, + DM_TIME TIME, + DM_DATE DATE, + + DM_BLOB BLOB, + DM_BINARY BINARY, + DM_VARBINARY VARBINARY, + DM_LONGVARBINARY LONGVARBINARY, + DM_IMAGE IMAGE, + DM_BFILE BFILE +) +""" +// only need for source +DML = """ +INSERT INTO "SYSDBA".e2e_table_source ( +DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT, DM_BIGINT, +DM_NUMERIC, DM_NUMBER, DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE, +DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB, +DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE, +DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE) +VALUES +(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1', + 'a', 'a', 'a', 'a', 'a', 'a', 'a', +'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '15:45:00', '2022-08-13', +null, null, null, null, null, null) +""" \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf new file mode 100644 index 00000000000..466b5b5c008 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + Jdbc { + url = "jdbc:dm://spark_e2e_dmdb:5236" + driver = "dm.jdbc.driver.DmDriver" + connection_check_timeout_sec = 1000 + user = "SYSDBA" + password = "SYSDBA" + query = """select DM_INT,DM_VARCHAR from "SYSDBA".e2e_table_source""" + partition_column = "DM_INT" + } + +} + +transform { + +} + +sink { + Jdbc { + url = "jdbc:dm://spark_e2e_dmdb:5236" + driver = "dm.jdbc.driver.DmDriver" + connection_check_timeout_sec = 1000 + user = "SYSDBA" + password = "SYSDBA" + query = """ + INSERT INTO "SYSDBA".e2e_table_sink(DM_INT,DM_VARCHAR) + values (?,?) +""" + } +} + diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml index 96d8bbb04d9..a7a084bac14 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml @@ -55,6 +55,7 @@ + org.awaitility awaitility diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/pom.xml index aecd54f826f..6e1b2aec5f9 100644 --- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/pom.xml @@ -49,6 +49,7 @@ org.postgresql postgresql + ${postgresql.version} test diff --git a/seatunnel-server/seatunnel-app/pom.xml b/seatunnel-server/seatunnel-app/pom.xml index d485d9f10a9..6acf7ead60c 100644 --- a/seatunnel-server/seatunnel-app/pom.xml +++ b/seatunnel-server/seatunnel-app/pom.xml @@ -137,6 +137,7 @@ mysql mysql-connector-java + ${mysql.version} provided