Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector-V2][JDBC-connector] support Jdbc dm #2377

Merged
merged 10 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` f

## Options

| name | type | required | default value |
| --- | --- | --- | --- |
| url | String | Yes | - |
| driver | String | Yes | - |
| user | String | No | - |
| password | String | No | - |
| query | String | Yes | - |
| connection_check_timeout_sec | Int | No | 30 |
| max_retries | Int | No | 3 |
| batch_size | Int | No | 300 |
| batch_interval_ms | Int | No | 1000 |
| is_exactly_once | Boolean | No | false |
| xa_data_source_class_name | String | No | - |
| max_commit_attempts | Int | No | 3 |
| transaction_timeout_sec | Int | No | -1 |
| name | type | required | default value |
|------------------------------|---------|----------|---------------|
| url | String | Yes | - |
| driver | String | Yes | - |
| user | String | No | - |
| password | String | No | - |
| query | String | Yes | - |
| connection_check_timeout_sec | Int | No | 30 |
| max_retries | Int | No | 3 |
| batch_size | Int | No | 300 |
| batch_interval_ms | Int | No | 1000 |
| is_exactly_once | Boolean | No | false |
| xa_data_source_class_name | String | No | - |
| max_commit_attempts | Int | No | 3 |
| transaction_timeout_sec | Int | No | -1 |

### driver [string]
The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver.
Expand Down Expand Up @@ -64,7 +64,7 @@ For batch writing, when the number of buffers reaches the number of `batch_size`
Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to set `xa_data_source_class_name`.

### xa_data_source_class_name[string]
The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource` and postgresql is `org.postgresql.xa.PGXADataSource`
The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource`, and please refer to appendix for other data sources

### max_commit_attempts[int]
The number of retries for transaction commit failures
Expand All @@ -76,6 +76,15 @@ The timeout after the transaction is opened, the default is -1 (never timeout).
In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup. For example, postgres needs to set `max_prepared_transactions > 1`
Such as `ALTER SYSTEM set max_prepared_transactions to 10`.

## appendix
there are some reference value for params above.

| datasource | driver | url | xa_data_source_class_name | maven |
|------------|--------------------------|-------------------------------------------|-------------------------------------|---------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |

## Example
Simple
```
Expand Down
32 changes: 21 additions & 11 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ supports query SQL and can achieve projection effect.

## Options

| name | type | required | default value |
| --- | --- | --- | --- |
| url | String | Yes | - |
| driver | String | Yes | - |
| user | String | No | - |
| password | String | No | - |
| query | String | Yes | - |
| connection_check_timeout_sec | Int | No | 30 |
| partition_column | String | No | - |
| partition_upper_bound | Long | No | - |
| partition_lower_bound | Long | No | - |
| name | type | required | default value |
|------------------------------|--------|----------|---------------|
| url | String | Yes | - |
| driver | String | Yes | - |
| user | String | No | - |
| password | String | No | - |
| query | String | Yes | - |
| connection_check_timeout_sec | Int | No | 30 |
| partition_column | String | No | - |
| partition_upper_bound | Long | No | - |
| partition_lower_bound | Long | No | - |

### driver [string]
The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver.
Expand Down Expand Up @@ -66,6 +66,16 @@ The partition_column min value for scan, if not set SeaTunnel will query databas
## tips
If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.


## appendix
there are some reference value for params above.

| datasource | driver | url | maven |
|------------|--------------------------|-------------------------------------------|---------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |

## Example
simple:
```Jdbc {
Expand Down
16 changes: 1 addition & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
<lombok.version>1.18.0</lombok.version>
<mysql.version>8.0.16</mysql.version>
<postgresql.version>42.3.3</postgresql.version>
<dm-jdbc.version>8.1.2.141</dm-jdbc.version>
<skip.pmd.check>false</skip.pmd.check>
<maven.deploy.skip>false</maven.deploy.skip>
<maven.javadoc.skip>false</maven.javadoc.skip>
Expand Down Expand Up @@ -206,21 +207,6 @@
<artifactId>seatunnel-config-shade</artifactId>
<version>${seatunnel.config.shade.version}</version>
</dependency>

<!--Because the license is not in compliance, if you need to use MySQL, you can add it yourself-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>

<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
Expand Down
12 changes: 9 additions & 3 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@

<properties>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<pg.version>42.3.3</pg.version>
<mysql.version>8.0.16</mysql.version>
<postgresql.version>42.3.3</postgresql.version>
<dm-jdbc.version>8.1.2.141</dm-jdbc.version>
</properties>

<dependencies>
Expand All @@ -47,7 +48,12 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${pg.version}</version>
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<version>${dm-jdbc.version}</version>
</dependency>

<dependency>
Expand All @@ -57,4 +63,4 @@
</dependency>
</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

public class DmdbDialect implements JdbcDialect {

@Override
public String dialectName() {
return "DM";
}

@Override
public JdbcRowConverter getRowConverter() {
return new DmdbJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new DmdbTypeMapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

/**
* Factory for {@link DmdbDialect}.
*/
@AutoService(JdbcDialectFactory.class)
public class DmdbDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:dm:");
}

@Override
public JdbcDialect create() {
return new DmdbDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class DmdbJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "DM";
}

@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
laglangyue marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading