Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat-#1244][sybase] add sybase reader plugin #1245

Merged
merged 2 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions chunjun-connectors/chunjun-connector-sybase/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>chunjun-connectors</artifactId>
<groupId>com.dtstack.chunjun</groupId>
<version>1.12-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>chunjun-connector-sybase</artifactId>
<name>ChunJun : Connectors : sybase</name>

<dependencies>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-jdbc-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.sourceforge.jtds</groupId>
<artifactId>jtds</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>log4j:log4j</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<!-- here the phase you need -->
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy todir="${basedir}/../../${dist.dir}/connector/sybase/"
file="${basedir}/target/${project.artifactId}-${project.version}.jar"/>
<move file="${basedir}/../../${dist.dir}/connector/sybase/${project.artifactId}-${project.version}.jar"
tofile="${basedir}/../../${dist.dir}/connector/sybase/${project.artifactId}.jar"/>
<delete>
<fileset dir="${basedir}/../../${dist.dir}/connector/sybase/"
includes="${project.artifactId}-*.jar"
excludes="${project.artifactId}.jar"/>
</delete>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.sybase.converter;

import com.dtstack.chunjun.conf.ChunJunCommonConf;
import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter;
import com.dtstack.chunjun.converter.IDeserializationConverter;
import com.dtstack.chunjun.element.AbstractBaseColumn;
import com.dtstack.chunjun.element.column.BigDecimalColumn;
import com.dtstack.chunjun.element.column.BooleanColumn;
import com.dtstack.chunjun.element.column.BytesColumn;
import com.dtstack.chunjun.element.column.SqlDateColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimeColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;

import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;

/** @Author OT @Date 2022/6/16 17:52 @Version 1.0 */
public class SybaseColumnConverter extends JdbcColumnConverter {
public SybaseColumnConverter(RowType rowType, ChunJunCommonConf commonConf) {
super(rowType, commonConf);
}

@Override
protected IDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return val -> {
// compatible with BIT(>1)
if (val instanceof byte[]) {
return new BytesColumn((byte[]) val);
} else {
return new BooleanColumn(Boolean.parseBoolean(val.toString()));
}
};
case TINYINT:
return val -> new BigDecimalColumn(((Integer) val).byteValue());
case SMALLINT:
case INTEGER:
return val -> new BigDecimalColumn((Integer) val);
case INTERVAL_YEAR_MONTH:
return (IDeserializationConverter<Object, AbstractBaseColumn>)
val -> {
YearMonthIntervalType yearMonthIntervalType =
(YearMonthIntervalType) type;
switch (yearMonthIntervalType.getResolution()) {
case YEAR:
return new BigDecimalColumn(
Integer.parseInt(String.valueOf(val).substring(0, 4)));
case MONTH:
case YEAR_TO_MONTH:
default:
throw new UnsupportedOperationException(
"jdbc converter only support YEAR");
}
};
case FLOAT:
return val -> {
if (val instanceof Double) {
BigDecimal b = new BigDecimal(String.valueOf(val));
return new BigDecimalColumn(b.doubleValue());
}
return new BigDecimalColumn((Float) val);
};
case DOUBLE:
return val -> new BigDecimalColumn((Double) val);
case BIGINT:
return val -> {
if (val instanceof Integer) {
return new BigDecimalColumn((Integer) val);
}
return new BigDecimalColumn((Long) val);
};
case DECIMAL:
return val -> {
if (val instanceof BigInteger) {
return new BigDecimalColumn((BigInteger) val);
}
return new BigDecimalColumn((BigDecimal) val);
};
case CHAR:
case VARCHAR:
return val -> new StringColumn((String) val);
case DATE:
return val -> new SqlDateColumn((Date) val);
case TIME_WITHOUT_TIME_ZONE:
return val -> new TimeColumn((Time) val);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (IDeserializationConverter<Object, AbstractBaseColumn>)
val ->
new TimestampColumn(
(Timestamp) val, ((TimestampType) (type)).getPrecision());

case BINARY:
case VARBINARY:
return val -> new BytesColumn((byte[]) val);
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.sybase.converter;

import com.dtstack.chunjun.throwable.UnsupportedTypeException;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;

import java.util.Locale;

/** @Author OT @Date 2022/6/13 17:59 @Version 1.0 */
public class SybaseRawTypeConverter {
public static DataType apply(String type) {
switch (type.toUpperCase(Locale.ENGLISH)) {
case "BIGINT":
case "UNSIGNED INT":
return DataTypes.BIGINT();
case "INT":
case "INTEGER":
case "UNSIGNED SMALLINT":
return DataTypes.INT();
case "SMALLINT":
return DataTypes.SMALLINT();
case "TINYINT":
return DataTypes.TINYINT();
case "UNSIGNED BIGINT":
return DataTypes.DECIMAL(20, 0);
case "NUMERIC":
case "DECIMAL":
return DataTypes.DECIMAL(38, 18);
case "NUMERIC IDENTITY":
return DataTypes.DECIMAL(38, 0);
case "FLOAT":
case "REAL":
return DataTypes.FLOAT();
case "DOUBLE":
return DataTypes.DOUBLE();
case "SMALLMONEY":
return DataTypes.DECIMAL(10, 4);
case "MONEY":
return DataTypes.DECIMAL(19, 4);
case "SMALLDATETIME":
return DataTypes.TIMESTAMP(0);
case "DATETIME":
case "BIGDATETIME":
return DataTypes.TIMESTAMP(3);
case "DATE":
return DataTypes.DATE();
case "TIME":
case "BIGTIME":
return DataTypes.TIME();
case "CHAR":
case "VARCHAR":
case "UNICHAR":
case "UNIVARCHAR":
case "NCHAR":
case "NVARCHAR":
case "TEXT":
case "UNITEXT":
case "LONGSYSNAME":
case "STRING":
return DataTypes.STRING();
case "BINARY":
case "TIMESTAMP":
case "VARBINARY":
case "IMAGE":
return DataTypes.BYTES();
case "BIT":
return DataTypes.BOOLEAN();
default:
throw new UnsupportedTypeException(type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.sybase.dialect;

import com.dtstack.chunjun.conf.ChunJunCommonConf;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputSplit;
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
import com.dtstack.chunjun.connector.sybase.converter.SybaseColumnConverter;
import com.dtstack.chunjun.connector.sybase.converter.SybaseRawTypeConverter;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.RawTypeConverter;

import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import io.vertx.core.json.JsonArray;

import java.sql.ResultSet;
import java.util.Optional;

/** @Author OT @Date 2022/6/16 13:54 @Version 1.0 */
public class SybaseDialect implements JdbcDialect {
private static final String DIALECT_NAME = "Sybase";
private static final String DRIVER_NAME = "net.sourceforge.jtds.jdbc.Driver";

@Override
public String dialectName() {
return DIALECT_NAME;
}

@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:jtds:sybase:");
}

@Override
public RawTypeConverter getRawTypeConverter() {
return SybaseRawTypeConverter::apply;
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of(DRIVER_NAME);
}

@Override
public String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}

@Override
public AbstractRowConverter<ResultSet, JsonArray, FieldNamedPreparedStatement, LogicalType>
getColumnConverter(RowType rowType, ChunJunCommonConf commonConf) {
return new SybaseColumnConverter(rowType, commonConf);
}

@Override
public String getSplitModFilter(JdbcInputSplit split, String splitPkName) {
return String.format(
"%s %% %s = %s",
quoteIdentifier(splitPkName), split.getTotalNumberOfSplits(), split.getMod());
}
}
Loading