Skip to content

Commit

Permalink
[Feature][Connector-V2][Redshift] Unified exception
Browse files Browse the repository at this point in the history
  • Loading branch information
TyrantLucifer committed Dec 16, 2022
1 parent c278e89 commit 6eb5b3f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.redshift;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcException;
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -35,7 +36,7 @@ public class RedshiftJdbcClient {

private final Connection connection;

public static RedshiftJdbcClient getInstance(Config config) throws S3RedshiftJdbcException {
public static RedshiftJdbcClient getInstance(Config config) throws S3RedshiftJdbcConnectorException {
if (INSTANCE == null) {
synchronized (RedshiftJdbcClient.class) {
if (INSTANCE == null) {
Expand All @@ -45,7 +46,8 @@ public static RedshiftJdbcClient getInstance(Config config) throws S3RedshiftJdb
config.getString(S3RedshiftConfig.JDBC_USER.key()),
config.getString(S3RedshiftConfig.JDBC_PASSWORD.key()));
} catch (SQLException | ClassNotFoundException e) {
throw new S3RedshiftJdbcException("RedshiftJdbcClient init error", e);
throw new S3RedshiftJdbcConnectorException(CommonErrorCode.SQL_OPERATION_FAILED,
"RedshiftJdbcClient init error", e);
}
}
}
Expand All @@ -66,7 +68,8 @@ public boolean checkTableExists(String tableName) {
ResultSet rs = meta.getTables(null, null, tableName, type);
flag = rs.next();
} catch (SQLException e) {
throw new S3RedshiftJdbcException(String.format("checkTableExists error, table name is %s ", tableName), e);
throw new S3RedshiftJdbcConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
String.format("Check table is or not existed failed, table name is %s ", tableName), e);
}
return flag;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.seatunnel.connectors.seatunnel.redshift.commit;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient;
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcException;
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand Down Expand Up @@ -99,8 +100,8 @@ public void close() throws IOException {
try {
RedshiftJdbcClient.getInstance(pluginConfig).close();
} catch (SQLException e) {
log.error("close redshift jdbc client error", e);
throw new S3RedshiftJdbcException("close redshift jdbc client error", e);
throw new S3RedshiftJdbcConnectorException(CommonErrorCode.SQL_OPERATION_FAILED,
"close redshift jdbc client failed", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@

package org.apache.seatunnel.connectors.seatunnel.redshift.exception;

public class S3RedshiftJdbcException extends RuntimeException {
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

public S3RedshiftJdbcException(String message) {
super(message);
public class S3RedshiftJdbcConnectorException extends SeaTunnelRuntimeException {

public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}

public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
super(seaTunnelErrorCode, errorMessage, cause);
}

public S3RedshiftJdbcException(String message, Throwable cause) {
super(message, cause);
public S3RedshiftJdbcConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
super(seaTunnelErrorCode, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.redshift.sink;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.common.config.CheckConfigUtil;
Expand All @@ -30,6 +31,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.redshift.commit.S3RedshiftSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -50,7 +52,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, S3Config.S3_BUCKET.key(), S3RedshiftConfig.JDBC_URL.key(),
S3RedshiftConfig.JDBC_USER.key(), S3RedshiftConfig.JDBC_PASSWORD.key(), S3RedshiftConfig.EXECUTE_SQL.key());
if (!checkResult.isSuccess()) {
throw new PrepareFailException(checkResult.getMsg(), PluginType.SINK, getPluginName());
throw new S3RedshiftJdbcConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, checkResult.getMsg()));
}
this.pluginConfig = pluginConfig;
hadoopConf = S3Conf.buildWithConfig(pluginConfig);
Expand Down

0 comments on commit 6eb5b3f

Please sign in to comment.