Skip to content

Commit

Permalink
[#4988] fix(doris-catalog): Fix the missing distribution information …
Browse files Browse the repository at this point in the history
…when loading Doris tables (#5048)

### What changes were proposed in this pull request?

Loading distribution information when obtaining Doris tables.

### Why are the changes needed?

It's a bug to be fixed.

Fix: #4988 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

IT.

Co-authored-by: Qi Yu <yuqi@datastrato.com>
  • Loading branch information
github-actions[bot] and yuqi1129 authored Sep 30, 2024
1 parent 0395854 commit 820daed
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,23 @@ public enum Strategy {
* @return The distribution strategy.
*/
public static Strategy getByName(String name) {
for (Strategy strategy : Strategy.values()) {
if (strategy.name().equalsIgnoreCase(name)) {
return strategy;
}
String upperName = name.toUpperCase();
switch (upperName) {
case "NONE":
return NONE;
case "HASH":
return HASH;
case "RANGE":
return RANGE;
case "EVEN":
case "RANDOM":
return EVEN;
default:
throw new IllegalArgumentException(
"Invalid distribution strategy: "
+ name
+ ". Valid values are: "
+ Arrays.toString(Strategy.values()));
}
throw new IllegalArgumentException(
"Invalid distribution strategy: "
+ name
+ ". Valid values are: "
+ Arrays.toString(Strategy.values()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
.withAuditInfo(load.auditInfo())
.withComment(comment)
.withProperties(properties)
.withDistribution(load.distribution())
.withIndexes(load.index())
.withPartitioning(load.partitioning())
.withDatabaseName(databaseName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ protected JdbcTable internalBuild() {
jdbcTable.auditInfo = auditInfo;
jdbcTable.columns = columns;
jdbcTable.partitioning = partitioning;
jdbcTable.distribution = distribution;
jdbcTable.sortOrders = sortOrders;
jdbcTable.indexes = indexes;
jdbcTable.proxyPlugin = proxyPlugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.Expression;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
Expand Down Expand Up @@ -204,11 +205,15 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
Transform[] tablePartitioning = getTablePartitioning(connection, databaseName, tableName);
jdbcTableBuilder.withPartitioning(tablePartitioning);

// 5.Get table properties
// 5.Get distribution information
Distribution distribution = getDistributionInfo(connection, databaseName, tableName);
jdbcTableBuilder.withDistribution(distribution);

// 6.Get table properties
Map<String, String> tableProperties = getTableProperties(connection, tableName);
jdbcTableBuilder.withProperties(tableProperties);

// 6.Leave the information to the bottom layer to append the table
// 7.Leave the information to the bottom layer to append the table
correctJdbcTableFields(connection, databaseName, tableName, jdbcTableBuilder);

return jdbcTableBuilder.withTableOperation(this).build();
Expand Down Expand Up @@ -236,6 +241,20 @@ protected Transform[] getTablePartitioning(
return Transforms.EMPTY_TRANSFORM;
}

/**
* Get the distribution information of the table, including the distribution type and the fields
*
* @param connection jdbc connection.
* @param databaseName database name.
* @param tableName table name.
* @return Returns the distribution information of the table.
* @throws SQLException if an error occurs while getting the distribution information.
*/
protected Distribution getDistributionInfo(
Connection connection, String databaseName, String tableName) throws SQLException {
return Distributions.NONE;
}

protected boolean getAutoIncrementInfo(ResultSet resultSet) throws SQLException {
return resultSet.getBoolean("IS_AUTOINCREMENT");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private static void validateDistribution(Distribution distribution, JdbcColumn[]

Preconditions.checkArgument(
Strategy.HASH == distribution.strategy() || Strategy.EVEN == distribution.strategy(),
"Doris only supports HASH or EVEN distribution strategy");
"Doris only supports HASH or EVEN(RANDOM) distribution strategy");

if (distribution.strategy() == Strategy.HASH) {
// Check if the distribution column exists
Expand All @@ -235,6 +235,10 @@ private static void validateDistribution(Distribution distribution, JdbcColumn[]
"Distribution column "
+ expression
+ " does not exist in the table columns"));
} else if (distribution.strategy() == Strategy.EVEN) {
Preconditions.checkArgument(
distribution.expressions().length == 0,
"Doris does not support distribution column in EVEN distribution strategy");
}
}

Expand Down Expand Up @@ -806,4 +810,17 @@ static String deleteIndexDefinition(
}
return "DROP INDEX " + deleteIndex.getName();
}

@Override
protected Distribution getDistributionInfo(
Connection connection, String databaseName, String tableName) throws SQLException {

String showCreateTableSql = String.format("SHOW CREATE TABLE `%s`", tableName);
try (Statement statement = connection.createStatement();
ResultSet result = statement.executeQuery(showCreateTableSql)) {
result.next();
String createTableSyntax = result.getString("Create Table");
return DorisUtils.extractDistributionInfoFromSql(createTableSyntax);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions.DistributionImpl;
import org.apache.gravitino.rel.expressions.distributions.Strategy;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.expressions.transforms.Transform;
Expand All @@ -40,6 +45,11 @@ public final class DorisUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(DorisUtils.class);
private static final Pattern PARTITION_INFO_PATTERN =
Pattern.compile("PARTITION BY \\b(LIST|RANGE)\\b\\((.+)\\)");

private static final Pattern DISTRIBUTION_INFO_PATTERN =
Pattern.compile(
"DISTRIBUTED BY\\s+(HASH|RANDOM)\\s*(\\(([^)]+)\\))?\\s*(BUCKETS\\s+(\\d+))?");

private static final String LIST_PARTITION = "LIST";
private static final String RANGE_PARTITION = "RANGE";

Expand Down Expand Up @@ -176,4 +186,38 @@ private static String generateListPartitionSqlValues(ListPartition listPartition
}
return String.format("IN (%s)", listValues.build().stream().collect(Collectors.joining(",")));
}

public static Distribution extractDistributionInfoFromSql(String createTableSql) {
Matcher matcher = DISTRIBUTION_INFO_PATTERN.matcher(createTableSql.trim());
if (matcher.find()) {
String distributionType = matcher.group(1);

// For Random distribution, no need to specify distribution columns.
String distributionColumns = matcher.group(3);
String[] columns =
Objects.equals(distributionColumns, null)
? new String[] {}
: Arrays.stream(distributionColumns.split(","))
.map(String::trim)
.map(f -> f.substring(1, f.length() - 1))
.toArray(String[]::new);

// Default bucket number is 1.
int bucketNum = 1;
if (matcher.find(5)) {
bucketNum = Integer.valueOf(matcher.group(5));
}

return new DistributionImpl.Builder()
.withStrategy(Strategy.getByName(distributionType))
.withNumber(bucketNum)
.withExpressions(
Arrays.stream(columns)
.map(col -> NamedReference.field(new String[] {col}))
.toArray(NamedReference[]::new))
.build();
}

throw new RuntimeException("Failed to extract distribution info in sql:" + createTableSql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.Expression;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
Expand All @@ -73,6 +74,7 @@
import org.apache.gravitino.utils.RandomNameUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -923,4 +925,45 @@ void testNonPartitionedTable() {
assertThrows(
UnsupportedOperationException.class, () -> tablePartitionOperations.dropPartition("p1"));
}

@Test
void testAllDistribution() {
Distribution[] distributions =
new Distribution[] {
Distributions.even(1, Expression.EMPTY_EXPRESSION),
Distributions.hash(1, NamedReference.field(DORIS_COL_NAME1)),
Distributions.even(10, Expression.EMPTY_EXPRESSION),
Distributions.hash(0, NamedReference.field(DORIS_COL_NAME1)),
Distributions.hash(11, NamedReference.field(DORIS_COL_NAME1)),
Distributions.hash(
12, NamedReference.field(DORIS_COL_NAME1), NamedReference.field(DORIS_COL_NAME2))
};

for (Distribution distribution : distributions) {
String tableName = GravitinoITUtils.genRandomName("test_distribution_table");
NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
Column[] columns = createColumns();
Index[] indexes = Indexes.EMPTY_INDEXES;
Map<String, String> properties = createTableProperties();
Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
TableCatalog tableCatalog = catalog.asTableCatalog();
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
null,
indexes);
// load table
Table loadTable = tableCatalog.loadTable(tableIdentifier);

Assertions.assertEquals(distribution.strategy(), loadTable.distribution().strategy());
Assertions.assertArrayEquals(
distribution.expressions(), loadTable.distribution().expressions());

tableCatalog.dropTable(tableIdentifier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.Expression;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
Expand Down Expand Up @@ -94,6 +95,54 @@ private static Map<String, String> createProperties() {
return properties;
}

@Test
void testAllDistribution() {
Distribution[] distributions =
new Distribution[] {
Distributions.even(DEFAULT_BUCKET_SIZE, Expression.EMPTY_EXPRESSION),
Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")),
Distributions.even(10, Expression.EMPTY_EXPRESSION),
Distributions.hash(0, NamedReference.field("col_1")),
Distributions.hash(11, NamedReference.field("col_1")),
Distributions.hash(12, NamedReference.field("col_1"), NamedReference.field("col_2"))
};

for (Distribution distribution : distributions) {
String tableName = GravitinoITUtils.genRandomName("doris_basic_test_table");
String tableComment = "test_comment";
List<JdbcColumn> columns = new ArrayList<>();
JdbcColumn col_1 =
JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build();
columns.add(col_1);
JdbcColumn col_2 =
JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build();
columns.add(col_2);
JdbcColumn col_3 =
JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build();
columns.add(col_3);
Map<String, String> properties = new HashMap<>();
Index[] indexes = new Index[] {};

// create table
TABLE_OPERATIONS.create(
databaseName,
tableName,
columns.toArray(new JdbcColumn[0]),
tableComment,
createProperties(),
null,
distribution,
indexes);
JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName);
assertionsTableInfo(
tableName, tableComment, columns, properties, indexes, Transforms.EMPTY_TRANSFORM, load);

Assertions.assertEquals(distribution.strategy(), load.distribution().strategy());
Assertions.assertArrayEquals(distribution.expressions(), load.distribution().expressions());
TABLE_OPERATIONS.drop(databaseName, tableName);
}
}

@Test
public void testBasicTableOperation() {
String tableName = GravitinoITUtils.genRandomName("doris_basic_test_table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void validateFieldExistence(ColumnDTO[] columns, String[] fieldNam
.filter(c -> c.name().equalsIgnoreCase(fieldName[0]))
.collect(Collectors.toList());
Preconditions.checkArgument(
partitionColumn.size() == 1, "partition field %s not found in table", fieldName[0]);
partitionColumn.size() == 1, "Field '%s' not found in table", fieldName[0]);

// TODO: should validate nested fieldName after column type support namedStruct
}
Expand Down
9 changes: 9 additions & 0 deletions docs/jdbc-doris-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ Note that although Gravitino supports several partitioning strategies, Apache Do
The `fieldName` specified in the partitioning attributes must be the name of columns defined in the table.
:::

### Table distribution

Users can also specify the distribution strategy when creating tables in the Doris catalog. Currently, the Doris catalog supports the following distribution strategies:
- `HASH`
- `RANDOM`

For the `RANDOM` distribution strategy, Gravitino uses the `EVEN` to represent it. More information about the distribution strategy defined in Gravitino can be found [here](./table-partitioning-distribution-sort-order-indexes.md#table-distribution).


### Table operations

Please refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#table-operations) for more details.
Expand Down
8 changes: 4 additions & 4 deletions docs/manage-relational-metadata-using-gravitino.md
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,14 @@ The following is the table property that Gravitino supports:
| `jdbc-postgresql` | [PostgreSQL table property](./jdbc-postgresql-catalog.md#table-properties) | [PostgreSQL type mapping](./jdbc-postgresql-catalog.md#table-column-types) |
| `doris` | [Doris table property](./jdbc-doris-catalog.md#table-properties) | [Doris type mapping](./jdbc-doris-catalog.md#table-column-types) |

#### Table partitioning, bucketing, sort ordering and indexes
#### Table partitioning, distribution, sort ordering and indexes

In addition to the basic settings, Gravitino supports the following features:

| Feature | Description | Java doc |
|---------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|
| Feature | Description | Java doc |
|---------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------|
| Table partitioning | Equal to `PARTITION BY` in Apache Hive, It is a partitioning strategy that is used to split a table into parts based on partition keys. Some table engine may not support this feature | [Partition](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/dto/rel/partitioning/Partitioning.html) |
| Table bucketing | Equal to `CLUSTERED BY` in Apache Hive, Bucketing a.k.a (Clustering) is a technique to split the data into more manageable files/parts, (By specifying the number of buckets to create). The value of the bucketing column will be hashed by a user-defined number into buckets. | [Distribution](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/rel/expressions/distributions/Distribution.html) |
| Table distribution | Equal to `CLUSTERED BY` in Apache Hive, distribution a.k.a (Clustering) is a technique to split the data into more manageable files/parts, (By specifying the number of buckets to create). The value of the distribution column will be hashed by a user-defined number into buckets. | [Distribution](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/rel/expressions/distributions/Distribution.html) |
| Table sort ordering | Equal to `SORTED BY` in Apache Hive, sort ordering is a method to sort the data in specific ways such as by a column or a function, and then store table data. it will highly improve the query performance under certain scenarios. | [SortOrder](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/rel/expressions/sorts/SortOrder.html) |
| Table indexes | Equal to `KEY/INDEX` in MySQL , unique key enforces uniqueness of values in one or more columns within a table. It ensures that no two rows have identical values in specified columns, thereby facilitating data integrity and enabling efficient data retrieval and manipulation operations. | [Index](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/rel/indexes/Index.html) |

Expand Down
Loading

0 comments on commit 820daed

Please sign in to comment.