From 952afe42bfffe30fe5dcd8b98bbb0d6dd2436aac Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Tue, 23 Jan 2024 17:09:08 +0800 Subject: [PATCH] [#1511] feat(trino-connector): Support partition, distribution and sort order of Iceberg table created by Trino (#1563) ### What changes were proposed in this pull request? We can create a Hive table with partitioning, distribution, and sorting ordered by Trino. ### Why are the changes needed? It's a crucial feature of the Trino connector. Fix: #1511 ### Does this PR introduce _any_ user-facing change? ```text trino:db1> create table t4(id int, name varchar) with (partitioning = ARRAY['name'], sorted_by = ARRAY['id']); CREATE TABLE trino:db1> show create table t4; Create Table ---------------------------------------------- CREATE TABLE "test.iceberg_catalog".db1.t4 ( id integer, name varchar ) COMMENT '' WITH ( partitioning = ARRAY['name'], sorted_by = ARRAY['id'] ) (1 row) Query 20240117_092158_00001_qufcr, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 1.08 [0 rows, 0B] [0 rows/s, 0B/s] ``` ### How was this patch tested? IT --- .../test/trino/TrinoConnectorIT.java | 31 ++++- .../lakehouse-iceberg/00000_create_table.sql | 59 ++++++++- .../lakehouse-iceberg/00000_create_table.txt | 62 +++++++++ .../iceberg/IcebergMetadataAdapter.java | 125 ++++++++++++++++++ .../catalog/iceberg/IcebergPropertyMeta.java | 28 +++- 5 files changed, 300 insertions(+), 5 deletions(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java index 2dc90633377..25d84a9b36e 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java @@ -15,6 +15,7 @@ import com.datastrato.gravitino.dto.rel.SortOrderDTO; import com.datastrato.gravitino.dto.rel.expressions.FieldReferenceDTO; import com.datastrato.gravitino.dto.rel.partitions.IdentityPartitioningDTO; +import com.datastrato.gravitino.dto.rel.partitions.Partitioning.SingleFieldPartitioning; import com.datastrato.gravitino.integration.test.catalog.jdbc.utils.JdbcDriverDownloader; import com.datastrato.gravitino.integration.test.container.ContainerSuite; import com.datastrato.gravitino.integration.test.container.HiveContainer; @@ -899,7 +900,15 @@ void testIcebergTableAndSchemaCreatedByGravitino() throws InterruptedException { ImmutableMap.builder() .put("format-version", "1") .put("key1", "value1") - .build()); + .build(), + new Transform[] {Transforms.identity("BinaryType")}, + Distributions.NONE, + new SortOrder[] { + SortOrders.of( + NamedReference.field("LongType"), + SortDirection.ASCENDING, + NullOrdering.NULLS_FIRST) + }); LOG.info("create table \"{}.{}\".{}.{}", metalakeName, catalogName, schemaName, tableName); Table table = @@ -921,6 +930,26 @@ void testIcebergTableAndSchemaCreatedByGravitino() throws InterruptedException { LOG.info("create iceberg hive table sql is: " + data); // Iceberg does not contain any properties; Assertions.assertFalse(data.contains("key1")); + Assertions.assertTrue(data.contains("partitioning = ARRAY['BinaryType']")); + Assertions.assertTrue(data.contains("sorted_by = ARRAY['LongType']")); + + String tableCreatedByTrino = GravitinoITUtils.genRandomName("table").toLowerCase(); + String createTableSql = + String.format( + "CREATE TABLE \"%s.%s\".%s.%s (id int, name varchar) with (partitioning = ARRAY['name'], sorted_by = ARRAY['id'])", + metalakeName, catalogName, schemaName, tableCreatedByTrino); + containerSuite.getTrinoContainer().executeUpdateSQL(createTableSql); + + table = + catalog + .asTableCatalog() + .loadTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, tableCreatedByTrino)); + + Arrays.stream(table.partitioning()) + .anyMatch(p -> ((SingleFieldPartitioning) p).fieldName()[0].equals("name")); + Arrays.stream(table.sortOrder()) + .anyMatch(p -> ((FieldReferenceDTO) p.expression()).fieldName()[0].equals("id")); } @Test diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00000_create_table.sql b/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00000_create_table.sql index be39e91a1f8..752ee63d54d 100644 --- a/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00000_create_table.sql +++ b/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00000_create_table.sql @@ -1,10 +1,67 @@ CREATE SCHEMA "test.gt_iceberg".gt_db2; -CREATE TABLE "test.gt_iceberg".gt_db2.tb01 ( +CREATE TABLE "test.gt_iceberg".gt_db2.tb01( + name varchar, + salary int +); + +show create table "test.gt_iceberg".gt_db2.tb01; + +CREATE TABLE "test.gt_iceberg".gt_db2.tb02 ( + name varchar, + salary int +) with ( + partitioning = ARRAY['name'], + sorted_by = ARRAY['salary'] + ); + +show create table "test.gt_iceberg".gt_db2.tb02; + +CREATE TABLE "test.gt_iceberg".gt_db2.tb03 ( + name varchar, + salary int +) with ( + partitioning = ARRAY['name'], + sorted_by = ARRAY['salary_wrong_name'] + ); + +CREATE TABLE "test.gt_iceberg".gt_db2.tb03 ( name varchar, salary int +) with ( + partitioning = ARRAY['name'], + sorted_by = ARRAY['name'] + ); + +show create table "test.gt_iceberg".gt_db2.tb03; + + +CREATE TABLE "test.gt_iceberg".gt_db2.tb04 ( + name varchar, + salary int +) with ( + sorted_by = ARRAY['name'] +); + +show create table "test.gt_iceberg".gt_db2.tb04; + +CREATE TABLE "test.gt_iceberg".gt_db2.tb05 ( + name varchar, + salary int +) with ( + partitioning = ARRAY['name'] ); +show create table "test.gt_iceberg".gt_db2.tb05; + drop table "test.gt_iceberg".gt_db2.tb01; +drop table "test.gt_iceberg".gt_db2.tb02; + +drop table "test.gt_iceberg".gt_db2.tb03; + +drop table "test.gt_iceberg".gt_db2.tb04; + +drop table "test.gt_iceberg".gt_db2.tb05; + drop schema "test.gt_iceberg".gt_db2; \ No newline at end of file diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00000_create_table.txt b/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00000_create_table.txt index 2862f23f236..4a59b73ab7f 100644 --- a/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00000_create_table.txt +++ b/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00000_create_table.txt @@ -2,6 +2,68 @@ CREATE SCHEMA CREATE TABLE +"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb01 ( + name varchar, + salary integer +) +COMMENT '' + +CREATE TABLE + +"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb02 ( + name varchar, + salary integer +) +COMMENT '' +WITH ( + partitioning = ARRAY['name'], + sorted_by = ARRAY['salary'] +)" + + partition field salary_wrong_name not found in table + +CREATE TABLE + +"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb03 ( + name varchar, + salary integer +) +COMMENT '' +WITH ( + partitioning = ARRAY['name'], + sorted_by = ARRAY['name'] +)" + +CREATE TABLE + +"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb04 ( + name varchar, + salary integer +) +COMMENT '' +WITH ( + sorted_by = ARRAY['name'] +)" + +CREATE TABLE + +"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb05 ( + name varchar, + salary integer +) +COMMENT '' +WITH ( + partitioning = ARRAY['name'] +)" + +DROP TABLE + +DROP TABLE + +DROP TABLE + +DROP TABLE + DROP TABLE DROP SCHEMA diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergMetadataAdapter.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergMetadataAdapter.java index 3c7ecfc15c9..23a4ff42fbf 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergMetadataAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergMetadataAdapter.java @@ -4,11 +4,34 @@ */ package com.datastrato.gravitino.trino.connector.catalog.iceberg; +import static com.datastrato.gravitino.trino.connector.catalog.iceberg.IcebergPropertyMeta.ICEBERG_PARTITIONING_PROPERTY; +import static com.datastrato.gravitino.trino.connector.catalog.iceberg.IcebergPropertyMeta.ICEBERG_SORTED_BY_PROPERTY; + import com.datastrato.catalog.property.PropertyConverter; +import com.datastrato.gravitino.dto.rel.partitions.Partitioning.SingleFieldPartitioning; +import com.datastrato.gravitino.rel.expressions.Expression; +import com.datastrato.gravitino.rel.expressions.NamedReference; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrders; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; +import com.datastrato.gravitino.trino.connector.metadata.GravitinoColumn; +import com.datastrato.gravitino.trino.connector.metadata.GravitinoTable; +import com.google.common.collect.ImmutableSet; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaTableName; import io.trino.spi.session.PropertyMetadata; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.ArrayUtils; /** Transforming gravitino Iceberg metadata to trino. */ public class IcebergMetadataAdapter extends CatalogConnectorMetadataAdapter { @@ -17,6 +40,9 @@ public class IcebergMetadataAdapter extends CatalogConnectorMetadataAdapter { private final PropertyConverter tableConverter; private final PropertyConverter schemaConverter; + private static final Set ICEBERG_PROPERTIES_TO_REMOVE = + ImmutableSet.of(ICEBERG_PARTITIONING_PROPERTY, ICEBERG_SORTED_BY_PROPERTY); + public IcebergMetadataAdapter( List> schemaProperties, List> tableProperties, @@ -49,4 +75,103 @@ public Map toGravitinoSchemaProperties(Map prope Map stringMap = schemaConverter.engineToGravitinoProperties(properties); return super.toGravitinoSchemaProperties(stringMap); } + + @Override + public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) { + String tableName = tableMetadata.getTableSchema().getTable().getTableName(); + String schemaName = tableMetadata.getTableSchema().getTable().getSchemaName(); + String comment = tableMetadata.getComment().orElse(""); + + Map propertyMap = tableMetadata.getProperties(); + List partitionColumns = + propertyMap.containsKey(ICEBERG_PARTITIONING_PROPERTY) + ? (List) propertyMap.get(ICEBERG_PARTITIONING_PROPERTY) + : Collections.EMPTY_LIST; + + List sortColumns = + propertyMap.containsKey(ICEBERG_SORTED_BY_PROPERTY) + ? (List) propertyMap.get(ICEBERG_SORTED_BY_PROPERTY) + : Collections.EMPTY_LIST; + + Map properties = + toGravitinoTableProperties( + removeKeys(tableMetadata.getProperties(), ICEBERG_PROPERTIES_TO_REMOVE)); + + List columns = new ArrayList<>(); + for (int i = 0; i < tableMetadata.getColumns().size(); i++) { + ColumnMetadata column = tableMetadata.getColumns().get(i); + columns.add( + new GravitinoColumn( + column.getName(), + dataTypeTransformer.getGravitinoType(column.getType()), + i, + column.getComment(), + column.isNullable())); + } + GravitinoTable gravitinoTable = + new GravitinoTable(schemaName, tableName, columns, comment, properties); + + if (!partitionColumns.isEmpty()) { + Transform[] partitioning = + partitionColumns.stream().map(Transforms::identity).toArray(Transform[]::new); + gravitinoTable.setPartitioning(partitioning); + } + + if (!sortColumns.isEmpty()) { + SortOrder[] sorting = + sortColumns.stream() + .map( + sortingColumn -> { + Expression expression = NamedReference.field(sortingColumn); + return SortOrders.ascending(expression); + }) + .toArray(SortOrder[]::new); + gravitinoTable.setSortOrders(sorting); + } + + return gravitinoTable; + } + + @Override + public ConnectorTableMetadata getTableMetadata(GravitinoTable gravitinoTable) { + SchemaTableName schemaTableName = + new SchemaTableName(gravitinoTable.getSchemaName(), gravitinoTable.getName()); + ArrayList columnMetadataList = new ArrayList<>(); + for (GravitinoColumn column : gravitinoTable.getColumns()) { + columnMetadataList.add(getColumnMetadata(column)); + } + + Map properties = toTrinoTableProperties(gravitinoTable.getProperties()); + + if (ArrayUtils.isNotEmpty(gravitinoTable.getPartitioning())) { + // Only support simple partition now like partition by a, b, c. + // Format like partition like partition by year(a), b, c is NOT supported now. + properties.put( + ICEBERG_PARTITIONING_PROPERTY, + gravitinoTable.getPartitioning().length > 0 + ? Arrays.stream(gravitinoTable.getPartitioning()) + .map(ts -> ((SingleFieldPartitioning) ts).fieldName()[0]) + .collect(Collectors.toList()) + : Collections.EMPTY_LIST); + } + + if (ArrayUtils.isNotEmpty(gravitinoTable.getSortOrders())) { + // Only support the simple format + properties.put( + ICEBERG_SORTED_BY_PROPERTY, + Arrays.stream(gravitinoTable.getSortOrders()) + .map( + sortOrder -> { + Expression expression = sortOrder.expression(); + return ((NamedReference) expression).fieldName()[0]; + }) + .collect(Collectors.toList())); + } + + return new ConnectorTableMetadata( + schemaTableName, + columnMetadataList, + properties, + Optional.ofNullable(gravitinoTable.getComment())); + } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergPropertyMeta.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergPropertyMeta.java index cb0d464e0ce..80d04172924 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergPropertyMeta.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergPropertyMeta.java @@ -5,24 +5,46 @@ package com.datastrato.gravitino.trino.connector.catalog.iceberg; +import static io.trino.spi.type.VarcharType.VARCHAR; + import com.datastrato.gravitino.trino.connector.catalog.HasPropertyMeta; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.type.ArrayType; import java.util.List; import java.util.Map; public class IcebergPropertyMeta implements HasPropertyMeta { + public static final String ICEBERG_PARTITIONING_PROPERTY = "partitioning"; + public static final String ICEBERG_SORTED_BY_PROPERTY = "sorted_by"; + // Value is whether this property is reserved and cannot be used by users // TODO (yuqi) add more properties public static final Map, Boolean> TABLE_PROPERTY_TO_RESERVED_MAP = new ImmutableMap.Builder().build(); public static final List> TABLE_PROPERTY_META = - TABLE_PROPERTY_TO_RESERVED_MAP.entrySet().stream() - .map(Map.Entry::getKey) - .collect(ImmutableList.toImmutableList()); + ImmutableList.of( + new PropertyMetadata<>( + ICEBERG_PARTITIONING_PROPERTY, + "Partition transforms", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> (List) value, + value -> value), + new PropertyMetadata<>( + ICEBERG_SORTED_BY_PROPERTY, + "Sorted columns", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> (List) value, + value -> value)); // TODO (yuqi) add more properties public static final Map, Boolean> SCHEMA_PROPERTY_TO_RESERVED_MAP =