Skip to content

Commit

Permalink
[apache#1707] feat(mysql): Support mysql index.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Jan 29, 2024
1 parent a542b57 commit b4b04f3
Show file tree
Hide file tree
Showing 23 changed files with 641 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public class Indexes {

public static final Index[] EMPTY_INDEXES = new Index[0];

/** MySQL does not support setting the name of the primary key, so the default name is used. */
public static final String DEFAULT_MYSQL_PRIMARY_KEY_NAME = "PRIMARY";

/**
* Create a unique index on columns. Like unique (a) or unique (a, b), for complex like unique
*
Expand All @@ -20,6 +23,16 @@ public static Index unique(String name, String[][] fieldNames) {
return of(Index.IndexType.UNIQUE_KEY, name, fieldNames);
}

/**
* To create a MySQL primary key, you need to use the default primary key name.
*
* @param fieldNames The field names under the table contained in the index.
* @return The primary key index
*/
public static Index createMysqlPrimaryKey(String[][] fieldNames) {
return primary(DEFAULT_MYSQL_PRIMARY_KEY_NAME, fieldNames);
}

/**
* Create a primary index on columns. Like primary (a), for complex like primary
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
// Remove id from comment
.withComment(StringIdentifier.removeIdFromComment(load.comment()))
.withProperties(properties)
.withIndexes(load.index())
.build();
}

Expand Down Expand Up @@ -364,7 +365,6 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
Preconditions.checkArgument(indexes.length == 0, "Jdbc-catalog does not support indexes");
Preconditions.checkArgument(
null == distribution || distribution == Distributions.NONE,
"jdbc-catalog does not support distribution");
Expand Down Expand Up @@ -399,7 +399,8 @@ public Table createTable(
jdbcColumns,
StringIdentifier.addToComment(identifier, comment),
resultProperties,
partitioning);
partitioning,
indexes);

return new JdbcTable.Builder()
.withAuditInfo(
Expand All @@ -412,6 +413,7 @@ public Table createTable(
.withComment(comment)
.withProperties(jdbcTablePropertiesMetadata.convertFromJdbcProperties(resultProperties))
.withPartitioning(partitioning)
.withIndexes(indexes)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ protected JdbcTable internalBuild() {
jdbcTable.columns = columns;
jdbcTable.partitioning = partitioning;
jdbcTable.sortOrders = sortOrders;
jdbcTable.indexes = indexes;
return jdbcTable;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
Expand Down Expand Up @@ -63,13 +64,14 @@ public void create(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning)
Transform[] partitioning,
Index[] indexes)
throws TableAlreadyExistsException {
LOG.info("Attempting to create table {} in database {}", tableName, databaseName);
try (Connection connection = getConnection(databaseName)) {
JdbcConnectorUtils.executeUpdate(
connection,
generateCreateTableSql(tableName, columns, comment, properties, partitioning));
generateCreateTableSql(tableName, columns, comment, properties, partitioning, indexes));
LOG.info("Created table {} in database {}", tableName, databaseName);
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
Expand Down Expand Up @@ -215,7 +217,8 @@ protected abstract String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning);
Transform[] partitioning,
Index[] indexes);

protected abstract String generateRenameTableSql(String oldTableName, String newTableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
Expand Down Expand Up @@ -41,14 +42,16 @@ void initialize(
* @param comment The comment of the table.
* @param properties The properties of the table.
* @param partitioning The partitioning of the table.
* @param indexes The indexes of the table.
*/
void create(
String databaseName,
String tableName,
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning)
Transform[] partitioning,
Index[] indexes)
throws TableAlreadyExistsException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.datastrato.gravitino.catalog.jdbc.JdbcColumn;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
Expand All @@ -20,7 +21,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("CREATE TABLE ").append(tableName).append(" (");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.indexes.Indexes;
import com.datastrato.gravitino.rel.types.Type;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -121,7 +122,7 @@ public void testOperationTable() {
Assertions.assertDoesNotThrow(
() ->
JDBC_TABLE_OPERATIONS.create(
DATABASE_NAME, table1, jdbcColumns, null, properties, null));
DATABASE_NAME, table1, jdbcColumns, null, properties, null, Indexes.EMPTY_INDEXES));

// list table.
List<String> allTables = JDBC_TABLE_OPERATIONS.listTables(DATABASE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.indexes.Indexes;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand All @@ -29,8 +31,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -87,6 +91,9 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
comment = tableProperties.getOrDefault(COMMENT, comment);
}

// 4.Get index information
List<Index> indexes = getIndexes(databaseName, tableName, metaData);

return new JdbcTable.Builder()
.withName(tableName)
.withColumns(jdbcColumns.toArray(new JdbcColumn[0]))
Expand All @@ -102,13 +109,71 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
}
}
}))
.withIndexes(indexes.toArray(new Index[0]))
.withAuditInfo(AuditInfo.EMPTY)
.build();
} catch (SQLException e) {
throw exceptionMapper.toGravitinoException(e);
}
}

private List<Index> getIndexes(String databaseName, String tableName, DatabaseMetaData metaData)
throws SQLException {
List<Index> indexes = new ArrayList<>();
// 1.Get primary key information
Map<String, Set<String>> primaryKeyGroupByName = new HashMap<>();
ResultSet primaryKeys = metaData.getPrimaryKeys(databaseName, null, tableName);
while (primaryKeys.next()) {
String columnName = primaryKeys.getString("COLUMN_NAME");
primaryKeyGroupByName.compute(
primaryKeys.getString("PK_NAME"),
(k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(columnName);
return v;
});
}
for (Map.Entry<String, Set<String>> entry : primaryKeyGroupByName.entrySet()) {
indexes.add(
Indexes.primary(
entry.getKey(), convertIndexFieldNames(entry.getValue().toArray(new String[0]))));
}

// 2.Get unique key information
Map<String, Set<String>> indexGroupByName = new HashMap<>();
ResultSet indexInfo = metaData.getIndexInfo(databaseName, null, tableName, false, false);
while (indexInfo.next()) {
String indexName = indexInfo.getString("INDEX_NAME");
if (!indexInfo.getBoolean("NON_UNIQUE")
&& !StringUtils.equalsIgnoreCase(Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME, indexName)) {
String columnName = indexInfo.getString("COLUMN_NAME");
indexGroupByName.compute(
indexName,
(k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(columnName);
return v;
});
}
}
for (Map.Entry<String, Set<String>> entry : indexGroupByName.entrySet()) {
indexes.add(
Indexes.unique(
entry.getKey(), convertIndexFieldNames(entry.getValue().toArray(new String[0]))));
}
return indexes;
}

private String[][] convertIndexFieldNames(String[] fieldNames) {
return Arrays.stream(fieldNames)
.map(colName -> new String[] {colName})
.toArray(String[][]::new);
}

private Map<String, String> loadTablePropertiesFromSql(Connection connection, String tableName)
throws SQLException {
try (PreparedStatement statement = connection.prepareStatement("SHOW TABLE STATUS LIKE ?")) {
Expand Down Expand Up @@ -178,7 +243,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
if (ArrayUtils.isNotEmpty(partitioning)) {
throw new UnsupportedOperationException("Currently we do not support Partitioning in mysql");
}
Expand All @@ -201,6 +267,41 @@ protected String generateCreateTableSql(
sqlBuilder.append(",\n");
}
}

for (Index index : indexes) {
String fieldStr =
Arrays.stream(index.fieldNames())
.map(
colNames -> {
if (colNames.length > 1) {
throw new IllegalArgumentException(
"Index does not support complex fields in MySQL");
}
return BACK_QUOTE + colNames[0] + BACK_QUOTE;
})
.collect(Collectors.joining(", "));
sqlBuilder.append(",\n");
switch (index.type()) {
case PRIMARY_KEY:
if (null != index.name()
&& !StringUtils.equalsIgnoreCase(
index.name(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
throw new IllegalArgumentException("Primary key name must be PRIMARY in MySQL");
}
sqlBuilder.append("CONSTRAINT ").append("PRIMARY KEY (").append(fieldStr).append(")");
break;
case UNIQUE_KEY:
sqlBuilder.append("CONSTRAINT ");
if (null != index.name()) {
sqlBuilder.append(BACK_QUOTE).append(index.name()).append(BACK_QUOTE);
}
sqlBuilder.append(" UNIQUE (").append(fieldStr).append(")");
break;
default:
throw new IllegalArgumentException("MySQL doesn't support index : " + index.type());
}
}

sqlBuilder.append("\n)");

// Add table comment if specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.base.Preconditions;
import java.sql.Connection;
Expand Down Expand Up @@ -225,7 +226,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
if (ArrayUtils.isNotEmpty(partitioning)) {
throw new UnsupportedOperationException(
"Currently we do not support Partitioning in PostgreSQL");
Expand Down
Loading

0 comments on commit b4b04f3

Please sign in to comment.