Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2] Close all ResultSet after used #7389

Merged
merged 2 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,24 @@ default Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, TableI

DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();

// seq -> column name
List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
String pkName = null;

// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
ResultSet rs =
metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(), tableId.table());

// seq -> column name
List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
String pkName = null;
while (rs.next()) {
// all the PK_NAME should be the same
pkName = rs.getString("PK_NAME");
String columnName = rs.getString("COLUMN_NAME");
int keySeq = rs.getInt("KEY_SEQ");
// KEY_SEQ is 1-based index
primaryKeyColumns.add(Pair.of(keySeq, columnName));
try (ResultSet rs =
metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(), tableId.table())) {
while (rs.next()) {
// all the PK_NAME should be the same
pkName = rs.getString("PK_NAME");
String columnName = rs.getString("COLUMN_NAME");
int keySeq = rs.getInt("KEY_SEQ");
// KEY_SEQ is 1-based index
primaryKeyColumns.add(Pair.of(keySeq, columnName));
}
}
// initialize size
List<String> pkFields =
Expand Down Expand Up @@ -121,41 +123,42 @@ default List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, Tab
throws SQLException {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();

ResultSet resultSet =
try (ResultSet resultSet =
metaData.getIndexInfo(
tableId.catalog(), tableId.schema(), tableId.table(), false, false);
// index name -> index
Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
while (resultSet.next()) {
String columnName = resultSet.getString("COLUMN_NAME");
if (columnName == null) {
continue;
tableId.catalog(), tableId.schema(), tableId.table(), false, false)) {
// index name -> index
Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
while (resultSet.next()) {
String columnName = resultSet.getString("COLUMN_NAME");
if (columnName == null) {
continue;
}

String indexName = resultSet.getString("INDEX_NAME");
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");

ConstraintKey constraintKey =
constraintKeyMap.computeIfAbsent(
indexName,
s -> {
ConstraintKey.ConstraintType constraintType =
ConstraintKey.ConstraintType.INDEX_KEY;
if (!noUnique) {
constraintType = ConstraintKey.ConstraintType.UNIQUE_KEY;
}
return ConstraintKey.of(
constraintType, indexName, new ArrayList<>());
});

ConstraintKey.ColumnSortType sortType =
"A".equals(resultSet.getString("ASC_OR_DESC"))
? ConstraintKey.ColumnSortType.ASC
: ConstraintKey.ColumnSortType.DESC;
ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
new ConstraintKey.ConstraintKeyColumn(columnName, sortType);
constraintKey.getColumnNames().add(constraintKeyColumn);
}

String indexName = resultSet.getString("INDEX_NAME");
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");

ConstraintKey constraintKey =
constraintKeyMap.computeIfAbsent(
indexName,
s -> {
ConstraintKey.ConstraintType constraintType =
ConstraintKey.ConstraintType.INDEX_KEY;
if (!noUnique) {
constraintType = ConstraintKey.ConstraintType.UNIQUE_KEY;
}
return ConstraintKey.of(
constraintType, indexName, new ArrayList<>());
});

ConstraintKey.ColumnSortType sortType =
"A".equals(resultSet.getString("ASC_OR_DESC"))
? ConstraintKey.ColumnSortType.ASC
: ConstraintKey.ColumnSortType.DESC;
ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
new ConstraintKey.ConstraintKeyColumn(columnName, sortType);
constraintKey.getColumnNames().add(constraintKeyColumn);
return new ArrayList<>(constraintKeyMap.values());
}
return new ArrayList<>(constraintKeyMap.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ private boolean clickhouseServerEnableExperimentalLightweightDelete(
return false;
}
String configKey = "allow_experimental_lightweight_delete";
try (Statement stmt = clickhouseConnection.createStatement()) {
ResultSet resultSet = stmt.executeQuery("SHOW SETTINGS ILIKE '%" + configKey + "%'");
try (Statement stmt = clickhouseConnection.createStatement();
ResultSet resultSet =
stmt.executeQuery("SHOW SETTINGS ILIKE '%" + configKey + "%'")) {
while (resultSet.next()) {
String name = resultSet.getString("name");
if (name.equalsIgnoreCase(configKey)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ public void open() throws CatalogException {
private String getDorisVersion() throws SQLException {
String dorisVersion = null;
try (PreparedStatement preparedStatement =
conn.prepareStatement(DorisCatalogUtil.QUERY_DORIS_VERSION_QUERY)) {
ResultSet resultSet = preparedStatement.executeQuery();
conn.prepareStatement(DorisCatalogUtil.QUERY_DORIS_VERSION_QUERY);
ResultSet resultSet = preparedStatement.executeQuery()) {

while (resultSet.next()) {
dorisVersion = resultSet.getString(2);
}
Expand Down Expand Up @@ -180,8 +181,9 @@ public String getDefaultDatabase() throws CatalogException {
public boolean databaseExists(String databaseName) throws CatalogException {
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.DATABASE_QUERY)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
return rs.next();
try (ResultSet rs = ps.executeQuery()) {
return rs.next();
}
} catch (SQLException e) {
throw new CatalogException("check database exists failed", e);
}
Expand All @@ -190,8 +192,8 @@ public boolean databaseExists(String databaseName) throws CatalogException {
@Override
public List<String> listDatabases() throws CatalogException {
List<String> databases = new ArrayList<>();
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY)) {
ResultSet rs = ps.executeQuery();
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY);
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String database = rs.getString(1);
databases.add(database);
Expand All @@ -210,10 +212,11 @@ public List<String> listTables(String databaseName)
try (PreparedStatement ps =
conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_DATABASE_QUERY)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String table = rs.getString(1);
tables.add(table);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String table = rs.getString(1);
tables.add(table);
}
}
} catch (SQLException e) {
throw new CatalogException(
Expand All @@ -229,8 +232,9 @@ public boolean tableExists(TablePath tablePath) throws CatalogException {
conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_IDENTIFIER_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
ResultSet rs = ps.executeQuery();
return rs.next();
try (ResultSet rs = ps.executeQuery()) {
return rs.next();
}
} catch (SQLException e) {
throw new CatalogException(
String.format("check table [%s] exists failed", tablePath.getFullName()), e);
Expand All @@ -248,18 +252,19 @@ public CatalogTable getTable(TablePath tablePath)
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
ResultSet rs = ps.executeQuery();
Map<String, String> options = connectorOptions();
buildTableSchemaWithErrorCheck(
tablePath, rs, builder, options, Collections.emptyList());
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
options,
Collections.emptyList(),
"",
catalogName);
try (ResultSet rs = ps.executeQuery()) {
Map<String, String> options = connectorOptions();
buildTableSchemaWithErrorCheck(
tablePath, rs, builder, options, Collections.emptyList());
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
options,
Collections.emptyList(),
"",
catalogName);
}
} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
Expand All @@ -279,17 +284,18 @@ public CatalogTable getTable(TablePath tablePath, List<String> fieldNames)
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
ResultSet rs = ps.executeQuery();
Map<String, String> options = connectorOptions();
buildTableSchemaWithErrorCheck(tablePath, rs, builder, options, fieldNames);
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
options,
Collections.emptyList(),
"",
catalogName);
try (ResultSet rs = ps.executeQuery()) {
Map<String, String> options = connectorOptions();
buildTableSchemaWithErrorCheck(tablePath, rs, builder, options, fieldNames);
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
options,
Collections.emptyList(),
"",
catalogName);
}
} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
Expand Down Expand Up @@ -480,8 +486,8 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
public boolean isExistsData(TablePath tablePath) {
String tableName = tablePath.getFullName();
String sql = String.format("select * from %s limit 1;", tableName);
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ResultSet resultSet = ps.executeQuery();
try (PreparedStatement ps = conn.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery()) {
return resultSet.next();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,9 @@ public interface ResultSetConsumer<T> {

protected List<String> queryString(String url, String sql, ResultSetConsumer<String> consumer)
throws SQLException {
try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
try (PreparedStatement ps = getConnection(url).prepareStatement(sql);
ResultSet rs = ps.executeQuery()) {
List<String> result = new ArrayList<>();
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String value = consumer.apply(rs);
if (value != null) {
Expand Down Expand Up @@ -643,8 +643,9 @@ public boolean isExistsData(TablePath tablePath) {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
Connection connection = getConnection(dbUrl);
String sql = getExistDataSql(tablePath);
try (PreparedStatement ps = connection.prepareStatement(sql)) {
ResultSet resultSet = ps.executeQuery();
try (PreparedStatement ps = connection.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery()) {

return resultSet.next();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,12 @@ protected String getDropDatabaseSql(String databaseName) {
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
Statement statement = defaultConnection.createStatement();
ResultSetMetaData metaData = statement.executeQuery(sqlQuery).getMetaData();
return CatalogUtils.getCatalogTable(
metaData, new OceanBaseMySqlTypeMapper(typeConverter), sqlQuery);
try (Statement statement = defaultConnection.createStatement();
ResultSet resultSet = statement.executeQuery(sqlQuery)) {
ResultSetMetaData metaData = resultSet.getMetaData();
return CatalogUtils.getCatalogTable(
metaData, new OceanBaseMySqlTypeMapper(typeConverter), sqlQuery);
}
}

@Override
Expand Down
Loading
Loading