Skip to content

Commit

Permalink
[Hotfix][JDBC-SINK] Fix TiDBCatalog without open (#4718)
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y authored May 12, 2023
1 parent cd9e34b commit 34a7f3e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
FACTORY_INITIALIZE_FAILED("API-06", "Factory initialize failed"),
DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
Expand All @@ -57,6 +58,8 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;

@AutoService(SeaTunnelSink.class)
public class JdbcSink
implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo>,
Expand Down Expand Up @@ -195,18 +198,22 @@ public void handleSaveMode(DataSaveMode saveMode) {
if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
return;
}
Catalog catalog =
try (Catalog catalog =
new TiDBCatalogFactory()
.createCatalog(
TiDBCatalogFactory.IDENTIFIER,
ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
TablePath tablePath =
TablePath.of(jdbcSinkConfig.getDatabase(), jdbcSinkConfig.getTable());
if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
catalog.createDatabase(tablePath, true);
}
if (!catalog.tableExists(tablePath)) {
catalog.createTable(tablePath, catalogTable, true);
ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)))) {
catalog.open();
TablePath tablePath =
TablePath.of(jdbcSinkConfig.getDatabase(), jdbcSinkConfig.getTable());
if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
catalog.createDatabase(tablePath, true);
}
if (!catalog.tableExists(tablePath)) {
catalog.createTable(tablePath, catalogTable, true);
}
} catch (Exception e) {
throw new JdbcConnectorException(HANDLE_SAVE_MODE_FAILED, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public OptionRule optionRule() {
TRANSACTION_TIMEOUT_SEC)
.conditional(IS_EXACTLY_ONCE, false, MAX_RETRIES)
.conditional(GENERATE_SINK_SQL, true, DATABASE)
.conditional(GENERATE_SINK_SQL, true, TABLE)
.conditional(GENERATE_SINK_SQL, false, QUERY)
.build();
}
Expand Down

0 comments on commit 34a7f3e

Please sign in to comment.