Skip to content

Commit

Permalink
[BugFix][Jdbc][#1459] Remove the 'checkConnValid', because some conne…
Browse files Browse the repository at this point in the history
…ctor doesn't support 'isValid'.
  • Loading branch information
FlechazoW committed Jan 5, 2023
1 parent 95bfcec commit 27ff4c2
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
import com.dtstack.chunjun.enums.EWriteMode;
import com.dtstack.chunjun.enums.Semantic;
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.throwable.WriteRecordException;
import com.dtstack.chunjun.util.ExceptionUtil;
import com.dtstack.chunjun.util.GsonUtil;
import com.dtstack.chunjun.util.JsonUtil;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -181,29 +179,6 @@ protected void writeMultipleRecordsInternal() throws Exception {
}
}

@Override
public synchronized void writeRecord(RowData rowData) {
checkConnValid();
super.writeRecord(rowData);
}

public void checkConnValid() {
try {
LOG.debug("check db connection valid..");
if (!dbConn.isValid(10)) {
if (Semantic.EXACTLY_ONCE == semantic) {
throw new FlinkRuntimeException(
"jdbc connection is valid!work's semantic is ExactlyOnce.To prevent data loss,we don't try to reopen the connection");
}
LOG.info("db connection reconnect..");
dbConn = getConnection();
stmtProxy.reOpen(dbConn);
}
} catch (Exception e) {
throw new ChunJunRuntimeException("failed to check jdbcConnection valid", e);
}
}

@Override
public void preCommit() throws Exception {
if (jdbcConf.getRestoreColumnIndex() > -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ protected void openInternal(int taskNumber, int numTasks) {
checkUpsert();
if (rowConverter instanceof JdbcColumnConverter) {
if (jdbcDialect.dialectName().equals("PostgresSQL")) {
((PostgresqlColumnConverter) rowConverter).setConnection((BaseConnection) dbConn);
((PostgresqlColumnConverter) rowConverter)
.setConnection((BaseConnection) dbConn);
}
((PostgresqlColumnConverter) rowConverter).setFieldTypeList(columnTypeList);
}
Expand Down

0 comments on commit 27ff4c2

Please sign in to comment.