Skip to content

Commit

Permalink
[feat-760][oceanbase]support oceanbase mysql tenant connector
Browse files Browse the repository at this point in the history
  • Loading branch information
qingxing authored and Paddy0523 committed May 2, 2022
1 parent ca01de7 commit 72e76ae
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 0 deletions.
80 changes: 80 additions & 0 deletions flinkx-connectors/flinkx-connector-oceanbase/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flinkx-connectors</artifactId>
<groupId>com.dtstack.flinkx</groupId>
<version>1.12-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flinkx-connector-oceanbase</artifactId>
<name>FlinkX : Connectors : OceanBase</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alipay.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.dtstack.flinkx</groupId>
<artifactId>flinkx-connector-jdbc-base</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<!-- here the phase you need -->
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy todir="${basedir}/../../${dist.dir}/connector/oceanbase/"
file="${basedir}/target/${project.artifactId}-${project.version}.jar"/>
<!--suppress UnresolvedMavenProperty -->
<move file="${basedir}/../../${dist.dir}/connector/oceanbase/${project.artifactId}-${project.version}.jar"
tofile="${basedir}/../../${dist.dir}/connector/oceanbase/${project.artifactId}-${git.branch}.jar"/>
<delete>
<!--suppress UnresolvedMavenProperty -->
<fileset dir="${basedir}/../../${dist.dir}/connector/oceanbase/"
includes="${project.artifactId}-*.jar"
excludes="${project.artifactId}-${git.branch}.jar"/>
</delete>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.flinkx.connector.oceanbase.converter;

import com.dtstack.flinkx.throwable.UnsupportedTypeException;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;

import java.util.Locale;

public class OceanbaseRawTypeConverter {
public static DataType apply(String type) {
switch (type.toUpperCase(Locale.ENGLISH)) {
case "BOOLEAN":
return DataTypes.BOOLEAN();
case "TINYINT":
return DataTypes.TINYINT();
case "SMALLINT":
return DataTypes.SMALLINT();
case "MEDIUMINT":
case "INT":
case "INTEGER":
return DataTypes.INT();
case "BIGINT":
return DataTypes.BIGINT();
case "FLOAT":
return DataTypes.FLOAT();
case "DECIMAL":
case "NUMERIC":
return DataTypes.DECIMAL(38, 18);
case "DOUBLE":
return DataTypes.DOUBLE();
case "DATE":
return DataTypes.DATE();
case "TIME":
return DataTypes.TIME();
case "TIMESTAMP":
case "DATETIME":
return DataTypes.TIMESTAMP(0);
case "BIT":
case "TINYBLOB":
case "BLOB":
case "MEDIUMBLOB":
case "LONGBLOB":
case "BINARY":
case "VARBINARY":
// BYTES 底层调用的是VARBINARY最大长度
return DataTypes.BYTES();
case "CHAR":
case "VARCHAR":
case "TINYTEXT":
case "TEXT":
case "MEDIUMTEXT":
case "LONGTEXT":
case "ENUM":
case "SET":
return DataTypes.STRING();
default:
throw new UnsupportedTypeException(type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.flinkx.connector.oceanbase.dialect;

import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.flinkx.connector.oceanbase.converter.OceanbaseRawTypeConverter;
import com.dtstack.flinkx.converter.RawTypeConverter;

import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

public class OceanbaseDialect implements JdbcDialect {
private static final String DIALECT_NAME = "OceanBase";
private static final String DEFAULT_DRIVER_NAME = "com.alipay.oceanbase.jdbc.Driver";

@Override
public String dialectName() {
return DIALECT_NAME;
}

@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oceanbase:");
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of(DEFAULT_DRIVER_NAME);
}

@Override
public RawTypeConverter getRawTypeConverter() {
return OceanbaseRawTypeConverter::apply;
}

@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}

@Override
public Optional<String> getReplaceStatement(
String schema, String tableName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return Optional.of(
"REPLACE INTO "
+ buildTableInfoWithSchema(schema, tableName)
+ "("
+ columns
+ ")"
+ " VALUES ("
+ placeholders
+ ")");
}

@Override
public Optional<String> getUpsertStatement(
String schema,
String tableName,
String[] fieldNames,
String[] uniqueKeyFields,
boolean allReplace) {
String updateClause;
if (allReplace) {
updateClause =
Arrays.stream(fieldNames)
.map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
} else {
updateClause =
Arrays.stream(fieldNames)
.map(
f ->
quoteIdentifier(f)
+ "=IFNULL(VALUES("
+ quoteIdentifier(f)
+ "),"
+ quoteIdentifier(f)
+ ")")
.collect(Collectors.joining(", "));
}

return Optional.of(
getInsertIntoStatement(schema, tableName, fieldNames)
+ " ON DUPLICATE KEY UPDATE "
+ updateClause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.flinkx.connector.oceanbase.sink;

import com.dtstack.flinkx.conf.SyncConf;
import com.dtstack.flinkx.connector.jdbc.sink.JdbcSinkFactory;
import com.dtstack.flinkx.connector.oceanbase.dialect.OceanbaseDialect;

public class OceanbaseSinkFactory extends JdbcSinkFactory {
public OceanbaseSinkFactory(SyncConf syncConf) {
super(syncConf, new OceanbaseDialect());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.flinkx.connector.oceanbase.source;

import com.dtstack.flinkx.conf.SyncConf;
import com.dtstack.flinkx.connector.jdbc.source.JdbcSourceFactory;
import com.dtstack.flinkx.connector.oceanbase.dialect.OceanbaseDialect;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.commons.lang.StringUtils;

public class OceanbaseSourceFactory extends JdbcSourceFactory {
public OceanbaseSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) {
super(syncConf, env, new OceanbaseDialect());
if (jdbcConf.isPolling()
&& StringUtils.isEmpty(jdbcConf.getStartLocation())
&& jdbcConf.getFetchSize() == 0) {
jdbcConf.setFetchSize(1000);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.flinkx.connector.oceanbase.table;

import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.flinkx.connector.jdbc.table.JdbcDynamicTableFactory;
import com.dtstack.flinkx.connector.oceanbase.dialect.OceanbaseDialect;

public class OceanbaseDynamicTableFactory extends JdbcDynamicTableFactory {
private static final String IDENTIFIER = "oceanbase-x";

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
protected JdbcDialect getDialect() {
return new OceanbaseDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

com.dtstack.flinkx.connector.oceanbase.table.OceanbaseDynamicTableFactory
1 change: 1 addition & 0 deletions flinkx-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<module>flinkx-connector-doris</module>
<module>flinkx-connector-influxdb</module>
<module>flinkx-connector-starrocks</module>
<module>flinkx-connector-oceanbase</module>

<!--File-->
<module>flinkx-connector-file</module>
Expand Down
Loading

0 comments on commit 72e76ae

Please sign in to comment.