Skip to content

Commit

Permalink
jdbc v2
Browse files Browse the repository at this point in the history
  • Loading branch information
mcy committed Dec 25, 2024
1 parent f22d4eb commit ffe84e7
Show file tree
Hide file tree
Showing 115 changed files with 16,777 additions and 0 deletions.
1 change: 1 addition & 0 deletions config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ connector-iceberg
connector-influxdb
connector-iotdb
connector-jdbc
connector-jdbc-mysql
connector-kafka
connector-kudu
connector-maxcompute
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-jdbc-v2</artifactId>
<version>${revision}</version>
</parent>

<artifactId>connector-jdbc-mysql</artifactId>
<name>SeaTunnel : Connectors V2 : Jdbc: MySQL</name>

<properties>
<mysql.version>8.0.27</mysql.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>jdbc-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<createSourcesJar>false</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<!-- rename hikari to avoid jar conflict from spark -->
<relocation>
<pattern>com.zaxxer.hikari</pattern>
<shadedPattern>${seatunnel.shade.package}.com.zaxxer.hikari</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;

import org.apache.seatunnel.shade.com.google.common.base.Preconditions;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion;

import com.mysql.cj.MysqlType;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;

@Slf4j
public class MySqlCatalog extends AbstractJdbcCatalog {

private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";

private static final String SELECT_DATABASE_EXISTS =
"SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'";

private static final String SELECT_TABLE_EXISTS =
"SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'";

private MySqlVersion version;
private MySqlTypeConverter typeConverter;

public MySqlCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo, null);
this.version = resolveVersion();
this.typeConverter = new MySqlTypeConverter(version);
}

@Override
protected String getDatabaseWithConditionSql(String databaseName) {
return String.format(SELECT_DATABASE_EXISTS, databaseName);
}

@Override
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected String getListDatabaseSql() {
return "SHOW DATABASES;";
}

@Override
protected String getListTableSql(String databaseName) {
return "SHOW TABLES;";
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
return rs.getString(1);
}

@Override
protected String getTableName(TablePath tablePath) {
return tablePath.getTableName();
}

@Override
protected String getSelectColumnsSql(TablePath tablePath) {
return String.format(
SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected TableIdentifier getTableIdentifier(TablePath tablePath) {
return TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath)
throws SQLException {
List<ConstraintKey> indexList =
super.getConstraintKeys(
metaData,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());
for (Iterator<ConstraintKey> it = indexList.iterator(); it.hasNext(); ) {
ConstraintKey index = it.next();
if (ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType())
&& "PRIMARY".equals(index.getConstraintName())) {
it.remove();
}
}
return indexList;
}

@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
// e.g. tinyint(1) unsigned
String columnType = resultSet.getString("COLUMN_TYPE");
// e.g. tinyint
String dataType = resultSet.getString("DATA_TYPE").toUpperCase();
String comment = resultSet.getString("COLUMN_COMMENT");
Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
String isNullableStr = resultSet.getString("IS_NULLABLE");
boolean isNullable = isNullableStr.equals("YES");
// e.g. `decimal(10, 2)` is 10
long numberPrecision = resultSet.getInt("NUMERIC_PRECISION");
// e.g. `decimal(10, 2)` is 2
int numberScale = resultSet.getInt("NUMERIC_SCALE");
// e.g. `varchar(10)` is 40
long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
// e.g. `timestamp(3)` is 3
int timePrecision =
MySqlVersion.V_5_5.equals(version) ? 0 : resultSet.getInt("DATETIME_PRECISION");

Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0));
Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));

MysqlType mysqlType = MysqlType.getByName(columnType);
boolean unsigned = columnType.toLowerCase(Locale.ROOT).contains("unsigned");

BasicTypeDefine<MysqlType> typeDefine =
BasicTypeDefine.<MysqlType>builder()
.name(columnName)
.columnType(columnType)
.dataType(dataType)
.nativeType(mysqlType)
.unsigned(unsigned)
.length(Math.max(charOctetLength, numberPrecision))
.precision(numberPrecision)
.scale(Math.max(numberScale, timePrecision))
.nullable(isNullable)
.defaultValue(defaultValue)
.comment(comment)
.build();
return typeConverter.convert(typeDefine);
}

@Override
protected String getCreateTableSql(
TablePath tablePath, CatalogTable table, boolean createIndex) {
return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter, createIndex)
.build(table.getCatalogName());
}

@Override
protected String getDropTableSql(TablePath tablePath) {
return String.format(
"DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected String getCreateDatabaseSql(String databaseName) {
return String.format("CREATE DATABASE `%s`;", databaseName);
}

@Override
protected String getDropDatabaseSql(String databaseName) {
return String.format("DROP DATABASE `%s`;", databaseName);
}

@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
return CatalogUtils.getCatalogTable(
defaultConnection, sqlQuery, new MySqlTypeMapper(typeConverter));
}

@Override
protected String getTruncateTableSql(TablePath tablePath) throws CatalogException {
return String.format(
"TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName());
}

public String getExistDataSql(TablePath tablePath) {
return String.format(
"SELECT * FROM `%s`.`%s` LIMIT 1;",
tablePath.getDatabaseName(), tablePath.getTableName());
}

private MySqlVersion resolveVersion() {
try (Statement statement = getConnection(defaultUrl).createStatement();
ResultSet resultSet = statement.executeQuery("SELECT VERSION()")) {
resultSet.next();
return MySqlVersion.parse(resultSet.getString(1));
} catch (Exception e) {
log.info(
"Failed to get mysql version, fallback to default version: {}",
MySqlVersion.V_5_7,
e);
return MySqlVersion.V_5_7;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;

import org.apache.seatunnel.shade.com.google.common.base.Preconditions;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import org.apache.commons.lang3.StringUtils;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class MySqlCatalogFactory implements CatalogFactory {

@Override
public String factoryIdentifier() {
return DatabaseIdentifier.MYSQL;
}

@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
Preconditions.checkArgument(
StringUtils.isNoneBlank(urlWithDatabase),
"Miss config <base-url>! Please check your config.");
JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
return new MySqlCatalog(
catalogName,
options.get(JdbcCatalogOptions.USERNAME),
options.get(JdbcCatalogOptions.PASSWORD),
urlInfo);
}

@Override
public OptionRule optionRule() {
return JdbcCatalogOptions.BASE_RULE.build();
}
}
Loading

0 comments on commit ffe84e7

Please sign in to comment.