Skip to content

Commit

Permalink
[Feature][Connector-V2][Jdbc] support gbase 8a (#3026)
Browse files Browse the repository at this point in the history
* gbase 8a connector

* add gbase8a e2e test
  • Loading branch information
liugddx authored Oct 19, 2022
1 parent 63dd9d0 commit dc6e85d
Show file tree
Hide file tree
Showing 14 changed files with 573 additions and 130 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ there are some reference value for params above.
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java |

## Example
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ there are some reference value for params above.
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |

## Example
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

public class Gbase8aDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Gbase8a";
}

@Override
public JdbcRowConverter getRowConverter() {
return new Gbase8aJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new Gbase8aTypeMapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

@AutoService(JdbcDialectFactory.class)
public class Gbase8aDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:gbase:");
}

@Override
public JdbcDialect create() {
return new Gbase8aDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class Gbase8aJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return "Gbase8a";
}

@SuppressWarnings("checkstyle:MagicNumber")
@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

@Slf4j
public class Gbase8aTypeMapper implements JdbcDialectTypeMapper {

//ref http://www.gbase.cn/down/4419.html
// ============================data types=====================
private static final String GBASE8A_UNKNOWN = "UNKNOWN";

// -------------------------number----------------------------
private static final String GBASE8A_INT = "INT";
private static final String GBASE8A_TINYINT = "TINYINT";
private static final String GBASE8A_SMALLINT = "SMALLINT";
private static final String GBASE8A_BIGINT = "BIGINT";
private static final String GBASE8A_DECIMAL = "DECIMAL";
private static final String GBASE8A_FLOAT = "FLOAT";
private static final String GBASE8A_DOUBLE = "DOUBLE";

// -------------------------string----------------------------
private static final String GBASE8A_CHAR = "CHAR";
private static final String GBASE8A_VARCHAR = "VARCHAR";


// ------------------------------time-------------------------
private static final String GBASE8A_DATE = "DATE";
private static final String GBASE8A_TIME = "TIME";
private static final String GBASE8A_TIMESTAMP = "TIMESTAMP";
private static final String GBASE8A_DATETIME = "DATETIME";

// ------------------------------blob-------------------------
private static final String GBASE8A_BLOB = "BLOB";
private static final String GBASE8A_TEXT = "TEXT";

@SuppressWarnings("checkstyle:MagicNumber")
@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String gbase8aType = metadata.getColumnTypeName(colIndex).toUpperCase();
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (gbase8aType) {
case GBASE8A_TINYINT:
return BasicType.BYTE_TYPE;
case GBASE8A_SMALLINT:
return BasicType.SHORT_TYPE;
case GBASE8A_INT:
return BasicType.INT_TYPE;
case GBASE8A_BIGINT:
return BasicType.LONG_TYPE;
case GBASE8A_DECIMAL:
if (precision < 38) {
return new DecimalType(precision, scale);
}
return new DecimalType(38, 18);
case GBASE8A_DOUBLE:
return BasicType.DOUBLE_TYPE;
case GBASE8A_FLOAT:
return BasicType.FLOAT_TYPE;
case GBASE8A_CHAR:
case GBASE8A_VARCHAR:
return BasicType.STRING_TYPE;
case GBASE8A_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case GBASE8A_TIME:
return LocalTimeType.LOCAL_TIME_TYPE;
case GBASE8A_TIMESTAMP:
case GBASE8A_DATETIME:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case GBASE8A_BLOB:
case GBASE8A_TEXT:
return PrimitiveByteArrayType.INSTANCE;
//Doesn't support yet
case GBASE8A_UNKNOWN:
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support GBASE8A type '%s' on column '%s' yet.",
gbase8aType, jdbcColumnName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,11 @@ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) th
switch (oracleType) {
case ORACLE_INTEGER:
return BasicType.INT_TYPE;
case ORACLE_FLOAT:
case ORACLE_NUMBER:
if (precision < 38) {
return new DecimalType(precision, scale);
}
//The float type will be converted to DecimalType(10, -127),
// which will lose precision in the spark engine
return new DecimalType(38, 18);
case ORACLE_FLOAT:
case ORACLE_BINARY_DOUBLE:
return BasicType.DOUBLE_TYPE;
case ORACLE_BINARY_FLOAT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- jdbc containers -->
<dependency>
Expand Down
Loading

0 comments on commit dc6e85d

Please sign in to comment.