diff --git a/flinkx-connectors/flinkx-connector-oceanbase/pom.xml b/flinkx-connectors/flinkx-connector-oceanbase/pom.xml new file mode 100644 index 0000000000..a13629e257 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-oceanbase/pom.xml @@ -0,0 +1,80 @@ + + + + flinkx-connectors + com.dtstack.flinkx + 1.12-SNAPSHOT + + 4.0.0 + + flinkx-connector-oceanbase + FlinkX : Connectors : OceanBase + + + 8 + 8 + + + + com.alipay.oceanbase + oceanbase-client + 1.1.5 + + + com.dtstack.flinkx + flinkx-connector-jdbc-base + ${project.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + false + + + + + + maven-antrun-plugin + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + + + + diff --git a/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/converter/OceanbaseRawTypeConverter.java b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/converter/OceanbaseRawTypeConverter.java new file mode 100644 index 0000000000..d10622a034 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/converter/OceanbaseRawTypeConverter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dtstack.flinkx.connector.oceanbase.converter; + +import com.dtstack.flinkx.throwable.UnsupportedTypeException; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +import java.util.Locale; + +public class OceanbaseRawTypeConverter { + public static DataType apply(String type) { + switch (type.toUpperCase(Locale.ENGLISH)) { + case "BOOLEAN": + return DataTypes.BOOLEAN(); + case "TINYINT": + return DataTypes.TINYINT(); + case "SMALLINT": + return DataTypes.SMALLINT(); + case "MEDIUMINT": + case "INT": + case "INTEGER": + return DataTypes.INT(); + case "BIGINT": + return DataTypes.BIGINT(); + case "FLOAT": + return DataTypes.FLOAT(); + case "DECIMAL": + case "NUMERIC": + return DataTypes.DECIMAL(38, 18); + case "DOUBLE": + return DataTypes.DOUBLE(); + case "DATE": + return DataTypes.DATE(); + case "TIME": + return DataTypes.TIME(); + case "TIMESTAMP": + case "DATETIME": + return DataTypes.TIMESTAMP(0); + case "BIT": + case "TINYBLOB": + case "BLOB": + case "MEDIUMBLOB": + case "LONGBLOB": + case "BINARY": + case "VARBINARY": + // BYTES 底层调用的是VARBINARY最大长度 + return DataTypes.BYTES(); + case "CHAR": + case "VARCHAR": + case "TINYTEXT": + case "TEXT": + case "MEDIUMTEXT": + case "LONGTEXT": + case "ENUM": + case "SET": + return DataTypes.STRING(); + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/dialect/OceanbaseDialect.java b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/dialect/OceanbaseDialect.java new file mode 100644 index 0000000000..707170e058 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/dialect/OceanbaseDialect.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dtstack.flinkx.connector.oceanbase.dialect; + +import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.flinkx.connector.oceanbase.converter.OceanbaseRawTypeConverter; +import com.dtstack.flinkx.converter.RawTypeConverter; + +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; + +public class OceanbaseDialect implements JdbcDialect { + private static final String DIALECT_NAME = "OceanBase"; + private static final String DEFAULT_DRIVER_NAME = "com.alipay.oceanbase.jdbc.Driver"; + + @Override + public String dialectName() { + return DIALECT_NAME; + } + + @Override + public boolean canHandle(String url) { + return url.startsWith("jdbc:oceanbase:"); + } + + @Override + public Optional defaultDriverName() { + return Optional.of(DEFAULT_DRIVER_NAME); + } + + @Override + public RawTypeConverter getRawTypeConverter() { + return OceanbaseRawTypeConverter::apply; + } + + @Override + public String quoteIdentifier(String identifier) { + return "`" + identifier + "`"; + } + + @Override + public Optional getReplaceStatement( + String schema, String tableName, String[] fieldNames) { + String columns = + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String placeholders = + Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", ")); + return Optional.of( + "REPLACE INTO " + + buildTableInfoWithSchema(schema, tableName) + + "(" + + columns + + ")" + + " VALUES (" + + placeholders + + ")"); + } + + @Override + public Optional getUpsertStatement( + String schema, + String tableName, + String[] fieldNames, + String[] uniqueKeyFields, + boolean allReplace) { + String updateClause; + if (allReplace) { + updateClause = + Arrays.stream(fieldNames) + .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")") + .collect(Collectors.joining(", ")); + } else { + updateClause = + Arrays.stream(fieldNames) + .map( + f -> + quoteIdentifier(f) + + "=IFNULL(VALUES(" + + quoteIdentifier(f) + + ")," + + quoteIdentifier(f) + + ")") + .collect(Collectors.joining(", ")); + } + + return Optional.of( + getInsertIntoStatement(schema, tableName, fieldNames) + + " ON DUPLICATE KEY UPDATE " + + updateClause); + } +} diff --git a/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/sink/OceanbaseSinkFactory.java b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/sink/OceanbaseSinkFactory.java new file mode 100644 index 0000000000..4e09ca92ae --- /dev/null +++ b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/sink/OceanbaseSinkFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dtstack.flinkx.connector.oceanbase.sink; + +import com.dtstack.flinkx.conf.SyncConf; +import com.dtstack.flinkx.connector.jdbc.sink.JdbcSinkFactory; +import com.dtstack.flinkx.connector.oceanbase.dialect.OceanbaseDialect; + +public class OceanbaseSinkFactory extends JdbcSinkFactory { + public OceanbaseSinkFactory(SyncConf syncConf) { + super(syncConf, new OceanbaseDialect()); + } +} diff --git a/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/source/OceanbaseSourceFactory.java b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/source/OceanbaseSourceFactory.java new file mode 100644 index 0000000000..6a9a37b3e8 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/source/OceanbaseSourceFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dtstack.flinkx.connector.oceanbase.source; + +import com.dtstack.flinkx.conf.SyncConf; +import com.dtstack.flinkx.connector.jdbc.source.JdbcSourceFactory; +import com.dtstack.flinkx.connector.oceanbase.dialect.OceanbaseDialect; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang.StringUtils; + +public class OceanbaseSourceFactory extends JdbcSourceFactory { + public OceanbaseSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { + super(syncConf, env, new OceanbaseDialect()); + if (jdbcConf.isPolling() + && StringUtils.isEmpty(jdbcConf.getStartLocation()) + && jdbcConf.getFetchSize() == 0) { + jdbcConf.setFetchSize(1000); + } + } +} diff --git a/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/table/OceanbaseDynamicTableFactory.java b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/table/OceanbaseDynamicTableFactory.java new file mode 100644 index 0000000000..65a6d95653 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-oceanbase/src/main/java/com/dtstack/flinkx/connector/oceanbase/table/OceanbaseDynamicTableFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dtstack.flinkx.connector.oceanbase.table; + +import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.flinkx.connector.jdbc.table.JdbcDynamicTableFactory; +import com.dtstack.flinkx.connector.oceanbase.dialect.OceanbaseDialect; + +public class OceanbaseDynamicTableFactory extends JdbcDynamicTableFactory { + private static final String IDENTIFIER = "oceanbase-x"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + protected JdbcDialect getDialect() { + return new OceanbaseDialect(); + } +} diff --git a/flinkx-connectors/flinkx-connector-oceanbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flinkx-connectors/flinkx-connector-oceanbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..2fcb44be5c --- /dev/null +++ b/flinkx-connectors/flinkx-connector-oceanbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.dtstack.flinkx.connector.oceanbase.table.OceanbaseDynamicTableFactory diff --git a/flinkx-connectors/pom.xml b/flinkx-connectors/pom.xml index bb75fc1b41..092f833df3 100644 --- a/flinkx-connectors/pom.xml +++ b/flinkx-connectors/pom.xml @@ -41,6 +41,7 @@ flinkx-connector-doris flinkx-connector-influxdb flinkx-connector-starrocks + flinkx-connector-oceanbase flinkx-connector-file diff --git a/flinkx-local-test/pom.xml b/flinkx-local-test/pom.xml index be08fde529..6f0fe0f8c9 100644 --- a/flinkx-local-test/pom.xml +++ b/flinkx-local-test/pom.xml @@ -206,6 +206,16 @@ flinkx-connector-saphana ${project.version} + + com.dtstack.flinkx + flinkx-connector-inceptor + 1.12-SNAPSHOT + + + com.dtstack.flinkx + flinkx-connector-oceanbase + 1.12-SNAPSHOT +