diff --git a/plugin-mapping.properties b/plugin-mapping.properties index b30005aea2e..1803a6a7611 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -164,3 +164,4 @@ seatunnel.sink.Doris = connector-doris seatunnel.source.Maxcompute = connector-maxcompute seatunnel.sink.Maxcompute = connector-maxcompute seatunnel.source.MySQL-CDC = connector-cdc-mysql +seatunnel.sink.S3Redshift = connector-s3-redshift diff --git a/seatunnel-connectors-v2/connector-s3-redshift/pom.xml b/seatunnel-connectors-v2/connector-s3-redshift/pom.xml new file mode 100644 index 00000000000..38c7576c387 --- /dev/null +++ b/seatunnel-connectors-v2/connector-s3-redshift/pom.xml @@ -0,0 +1,61 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-s3-redshift + + + 2.1.0.9 + + + + + org.apache.seatunnel + connector-file-base-hadoop + ${project.version} + + + org.apache.flink + flink-shaded-hadoop-2 + + + + + org.apache.seatunnel + connector-file-s3 + ${project.version} + + + com.amazon.redshift + redshift-jdbc42 + ${redshift.version} + + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java new file mode 100644 index 00000000000..b1d5e5a1e17 --- /dev/null +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redshift; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig; +import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +public class RedshiftJdbcClient { + + private static volatile RedshiftJdbcClient INSTANCE = null; + + private final Connection connection; + + public static RedshiftJdbcClient getInstance(Config config) throws S3RedshiftJdbcConnectorException { + if (INSTANCE == null) { + synchronized (RedshiftJdbcClient.class) { + if (INSTANCE == null) { + + try { + INSTANCE = new RedshiftJdbcClient(config.getString(S3RedshiftConfig.JDBC_URL.key()), + config.getString(S3RedshiftConfig.JDBC_USER.key()), + config.getString(S3RedshiftConfig.JDBC_PASSWORD.key())); + } catch (SQLException | ClassNotFoundException e) { + throw new S3RedshiftJdbcConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, + "RedshiftJdbcClient init error", e); + } + } + } + } + return INSTANCE; + } + + private RedshiftJdbcClient(String url, String user, String password) throws SQLException, ClassNotFoundException { + Class.forName("com.amazon.redshift.jdbc42.Driver"); + this.connection = DriverManager.getConnection(url, user, password); + } + + public boolean checkTableExists(String tableName) { + boolean flag = false; + try { + DatabaseMetaData meta = connection.getMetaData(); + String[] type = {"TABLE"}; + ResultSet rs = meta.getTables(null, null, tableName, type); + flag = rs.next(); + } catch (SQLException e) { + throw new S3RedshiftJdbcConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, + String.format("Check table is or not existed failed, table name is %s ", tableName), e); + } + return flag; + } + + public boolean execute(String sql) throws Exception { + try (Statement statement = connection.createStatement()) { + return statement.execute(sql); + } + } + + public synchronized void close() throws SQLException { + connection.close(); + + } + +} diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java new file mode 100644 index 00000000000..6e8267451aa --- /dev/null +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redshift.commit; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; +import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient; +import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig; +import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Slf4j +public class S3RedshiftSinkAggregatedCommitter extends FileSinkAggregatedCommitter { + + private final String executeSql; + + private Config pluginConfig; + + public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, Config pluginConfig) { + super(hadoopConf); + this.pluginConfig = pluginConfig; + this.executeSql = pluginConfig.getString(S3RedshiftConfig.EXECUTE_SQL.key()); + } + + @Override + public List commit(List aggregatedCommitInfos) { + List errorAggregatedCommitInfoList = new ArrayList<>(); + aggregatedCommitInfos.forEach(aggregatedCommitInfo -> { + try { + for (Map.Entry> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) { + for (Map.Entry tmpFileEntry : entry.getValue().entrySet()) { + String sql = convertSql(tmpFileEntry.getKey()); + log.debug("execute redshift sql is:" + sql); + RedshiftJdbcClient.getInstance(pluginConfig).execute(sql); + try { + FileSystemUtils.deleteFile(tmpFileEntry.getKey()); + } catch (IOException e) { + log.warn("delete tmp file error:" + tmpFileEntry.getKey()); + } + } + + } + + } catch (Exception e) { + log.error("commit aggregatedCommitInfo error ", e); + errorAggregatedCommitInfoList.add(aggregatedCommitInfo); + } + }); + return errorAggregatedCommitInfoList; + } + + @Override + public void abort(List aggregatedCommitInfos) { + if (aggregatedCommitInfos == null || aggregatedCommitInfos.isEmpty()) { + return; + } + aggregatedCommitInfos.forEach(aggregatedCommitInfo -> { + try { + for (Map.Entry> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) { + // delete the transaction dir + FileSystemUtils.deleteFile(entry.getKey()); + } + } catch (Exception e) { + log.error("abort aggregatedCommitInfo error ", e); + } + }); + } + + @Override + public void close() throws IOException { + super.close(); + try { + RedshiftJdbcClient.getInstance(pluginConfig).close(); + } catch (SQLException e) { + throw new S3RedshiftJdbcConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, + "close redshift jdbc client failed", e); + } + } + + private String convertSql(String path) { + return StringUtils.replace(executeSql, "${path}", path); + } + +} diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfig.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfig.java new file mode 100644 index 00000000000..ef2f5d18df0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redshift.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config; + +public class S3RedshiftConfig extends S3Config { + + public static final Option JDBC_URL = Options.key("jdbc_url").stringType().noDefaultValue().withDescription("Redshift JDBC URL"); + + public static final Option JDBC_USER = Options.key("jdbc_user").stringType().noDefaultValue().withDescription("Redshift JDBC user"); + + public static final Option JDBC_PASSWORD = Options.key("jdbc_password").stringType().noDefaultValue().withDescription("Redshift JDBC password"); + + public static final Option EXECUTE_SQL = Options.key("execute_sql").stringType().noDefaultValue().withDescription("Redshift execute sql"); + +} diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftJdbcConnectorException.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftJdbcConnectorException.java new file mode 100644 index 00000000000..731ef7a8fd0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftJdbcConnectorException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redshift.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class S3RedshiftJdbcConnectorException extends SeaTunnelRuntimeException { + + public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java new file mode 100644 index 00000000000..696fd03a3fc --- /dev/null +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redshift.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config; +import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class S3RedshiftFactory implements TableSinkFactory { + + @Override + public String factoryIdentifier() { + return "S3Redshift"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(S3Config.S3_BUCKET, S3RedshiftConfig.JDBC_URL, S3RedshiftConfig.JDBC_USER, S3RedshiftConfig.JDBC_PASSWORD, S3RedshiftConfig.EXECUTE_SQL, BaseSourceConfig.FILE_PATH) + .optional(S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY) + .optional(BaseSinkConfig.FILE_FORMAT) + .optional(BaseSinkConfig.FILENAME_TIME_FORMAT) + .optional(BaseSinkConfig.FIELD_DELIMITER) + .optional(BaseSinkConfig.ROW_DELIMITER) + .optional(BaseSinkConfig.PARTITION_BY) + .optional(BaseSinkConfig.PARTITION_DIR_EXPRESSION) + .optional(BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE) + .optional(BaseSinkConfig.SINK_COLUMNS) + .optional(BaseSinkConfig.IS_ENABLE_TRANSACTION) + .optional(BaseSinkConfig.FILE_NAME_EXPRESSION) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java new file mode 100644 index 00000000000..06debc29551 --- /dev/null +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redshift.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.redshift.commit.S3RedshiftSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig; +import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.util.Optional; + +@AutoService(SeaTunnelSink.class) +public class S3RedshiftSink extends BaseHdfsFileSink { + + @Override + public String getPluginName() { + return "S3Redshift"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, S3Config.S3_BUCKET.key(), S3RedshiftConfig.JDBC_URL.key(), + S3RedshiftConfig.JDBC_USER.key(), S3RedshiftConfig.JDBC_PASSWORD.key(), S3RedshiftConfig.EXECUTE_SQL.key()); + if (!checkResult.isSuccess()) { + throw new S3RedshiftJdbcConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, checkResult.getMsg())); + } + this.pluginConfig = pluginConfig; + hadoopConf = S3Conf.buildWithConfig(pluginConfig); + } + + @Override + public Optional> createAggregatedCommitter() { + return Optional.of(new S3RedshiftSinkAggregatedCommitter(hadoopConf, pluginConfig)); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 3526fb97c3b..ff56c817bb3 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -60,6 +60,7 @@ connector-amazondynamodb connector-tablestore connector-cassandra + connector-s3-redshift connector-starrocks connector-google-sheets connector-slack