From 27ff4c230b4d93f056d6428554b4c5b01d60225d Mon Sep 17 00:00:00 2001 From: FlechazoW Date: Thu, 5 Jan 2023 17:02:12 +0800 Subject: [PATCH] [BugFix][Jdbc][#1459] Remove the 'checkConnValid', because some connector doesn't support 'isValid'. --- .../connector/jdbc/sink/JdbcOutputFormat.java | 25 ------------------- .../postgresql/sink/PostgresOutputFormat.java | 3 ++- 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java index af3e0c0a72..e2db65adf5 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java @@ -29,7 +29,6 @@ import com.dtstack.chunjun.enums.EWriteMode; import com.dtstack.chunjun.enums.Semantic; import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; -import com.dtstack.chunjun.throwable.ChunJunRuntimeException; import com.dtstack.chunjun.throwable.WriteRecordException; import com.dtstack.chunjun.util.ExceptionUtil; import com.dtstack.chunjun.util.GsonUtil; @@ -37,7 +36,6 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -181,29 +179,6 @@ protected void writeMultipleRecordsInternal() throws Exception { } } - @Override - public synchronized void writeRecord(RowData rowData) { - checkConnValid(); - super.writeRecord(rowData); - } - - public void checkConnValid() { - try { - LOG.debug("check db connection valid.."); - if (!dbConn.isValid(10)) { - if (Semantic.EXACTLY_ONCE == semantic) { - throw new FlinkRuntimeException( - "jdbc connection is valid!work's semantic is ExactlyOnce.To prevent data loss,we don't try to reopen the connection"); - } - LOG.info("db connection reconnect.."); - dbConn = getConnection(); - stmtProxy.reOpen(dbConn); - } - } catch (Exception e) { - throw new ChunJunRuntimeException("failed to check jdbcConnection valid", e); - } - } - @Override public void preCommit() throws Exception { if (jdbcConf.getRestoreColumnIndex() > -1) { diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java index 8e3f69629a..f39d02b38e 100644 --- a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java @@ -90,7 +90,8 @@ protected void openInternal(int taskNumber, int numTasks) { checkUpsert(); if (rowConverter instanceof JdbcColumnConverter) { if (jdbcDialect.dialectName().equals("PostgresSQL")) { - ((PostgresqlColumnConverter) rowConverter).setConnection((BaseConnection) dbConn); + ((PostgresqlColumnConverter) rowConverter) + .setConnection((BaseConnection) dbConn); } ((PostgresqlColumnConverter) rowConverter).setFieldTypeList(columnTypeList); }