Skip to content

Commit

Permalink
[#1511] feat(trino-connector): Support partition, distribution and so…
Browse files Browse the repository at this point in the history
…rt order of Iceberg table created by Trino (#1563)

### What changes were proposed in this pull request?

We can create a Hive table with partitioning, distribution, and sorting
ordered by Trino.

### Why are the changes needed?

It's a crucial feature of the Trino connector.

Fix: #1511 

### Does this PR introduce _any_ user-facing change?

```text
trino:db1> create table t4(id int, name varchar) with (partitioning = ARRAY['name'], sorted_by = ARRAY['id']);
CREATE TABLE
trino:db1> show create table t4;
                 Create Table
----------------------------------------------
 CREATE TABLE "test.iceberg_catalog".db1.t4 (
    id integer,
    name varchar
 )
 COMMENT ''
 WITH (
    partitioning = ARRAY['name'],
    sorted_by = ARRAY['id']
 )
(1 row)

Query 20240117_092158_00001_qufcr, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
1.08 [0 rows, 0B] [0 rows/s, 0B/s]
```


### How was this patch tested?

IT
  • Loading branch information
yuqi1129 authored Jan 23, 2024
1 parent 28cb2f7 commit 952afe4
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.datastrato.gravitino.dto.rel.SortOrderDTO;
import com.datastrato.gravitino.dto.rel.expressions.FieldReferenceDTO;
import com.datastrato.gravitino.dto.rel.partitions.IdentityPartitioningDTO;
import com.datastrato.gravitino.dto.rel.partitions.Partitioning.SingleFieldPartitioning;
import com.datastrato.gravitino.integration.test.catalog.jdbc.utils.JdbcDriverDownloader;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.container.HiveContainer;
Expand Down Expand Up @@ -899,7 +900,15 @@ void testIcebergTableAndSchemaCreatedByGravitino() throws InterruptedException {
ImmutableMap.<String, String>builder()
.put("format-version", "1")
.put("key1", "value1")
.build());
.build(),
new Transform[] {Transforms.identity("BinaryType")},
Distributions.NONE,
new SortOrder[] {
SortOrders.of(
NamedReference.field("LongType"),
SortDirection.ASCENDING,
NullOrdering.NULLS_FIRST)
});
LOG.info("create table \"{}.{}\".{}.{}", metalakeName, catalogName, schemaName, tableName);

Table table =
Expand All @@ -921,6 +930,26 @@ void testIcebergTableAndSchemaCreatedByGravitino() throws InterruptedException {
LOG.info("create iceberg hive table sql is: " + data);
// Iceberg does not contain any properties;
Assertions.assertFalse(data.contains("key1"));
Assertions.assertTrue(data.contains("partitioning = ARRAY['BinaryType']"));
Assertions.assertTrue(data.contains("sorted_by = ARRAY['LongType']"));

String tableCreatedByTrino = GravitinoITUtils.genRandomName("table").toLowerCase();
String createTableSql =
String.format(
"CREATE TABLE \"%s.%s\".%s.%s (id int, name varchar) with (partitioning = ARRAY['name'], sorted_by = ARRAY['id'])",
metalakeName, catalogName, schemaName, tableCreatedByTrino);
containerSuite.getTrinoContainer().executeUpdateSQL(createTableSql);

table =
catalog
.asTableCatalog()
.loadTable(
NameIdentifier.of(metalakeName, catalogName, schemaName, tableCreatedByTrino));

Arrays.stream(table.partitioning())
.anyMatch(p -> ((SingleFieldPartitioning) p).fieldName()[0].equals("name"));
Arrays.stream(table.sortOrder())
.anyMatch(p -> ((FieldReferenceDTO) p.expression()).fieldName()[0].equals("id"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,67 @@
CREATE SCHEMA "test.gt_iceberg".gt_db2;

CREATE TABLE "test.gt_iceberg".gt_db2.tb01 (
CREATE TABLE "test.gt_iceberg".gt_db2.tb01(
name varchar,
salary int
);

show create table "test.gt_iceberg".gt_db2.tb01;

CREATE TABLE "test.gt_iceberg".gt_db2.tb02 (
name varchar,
salary int
) with (
partitioning = ARRAY['name'],
sorted_by = ARRAY['salary']
);

show create table "test.gt_iceberg".gt_db2.tb02;

CREATE TABLE "test.gt_iceberg".gt_db2.tb03 (
name varchar,
salary int
) with (
partitioning = ARRAY['name'],
sorted_by = ARRAY['salary_wrong_name']
);

CREATE TABLE "test.gt_iceberg".gt_db2.tb03 (
name varchar,
salary int
) with (
partitioning = ARRAY['name'],
sorted_by = ARRAY['name']
);

show create table "test.gt_iceberg".gt_db2.tb03;


CREATE TABLE "test.gt_iceberg".gt_db2.tb04 (
name varchar,
salary int
) with (
sorted_by = ARRAY['name']
);

show create table "test.gt_iceberg".gt_db2.tb04;

CREATE TABLE "test.gt_iceberg".gt_db2.tb05 (
name varchar,
salary int
) with (
partitioning = ARRAY['name']
);

show create table "test.gt_iceberg".gt_db2.tb05;

drop table "test.gt_iceberg".gt_db2.tb01;

drop table "test.gt_iceberg".gt_db2.tb02;

drop table "test.gt_iceberg".gt_db2.tb03;

drop table "test.gt_iceberg".gt_db2.tb04;

drop table "test.gt_iceberg".gt_db2.tb05;

drop schema "test.gt_iceberg".gt_db2;
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,68 @@ CREATE SCHEMA

CREATE TABLE

"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb01 (
name varchar,
salary integer
)
COMMENT ''

CREATE TABLE

"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb02 (
name varchar,
salary integer
)
COMMENT ''
WITH (
partitioning = ARRAY['name'],
sorted_by = ARRAY['salary']
)"

<QUERY_FAILED> partition field salary_wrong_name not found in table

CREATE TABLE

"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb03 (
name varchar,
salary integer
)
COMMENT ''
WITH (
partitioning = ARRAY['name'],
sorted_by = ARRAY['name']
)"

CREATE TABLE

"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb04 (
name varchar,
salary integer
)
COMMENT ''
WITH (
sorted_by = ARRAY['name']
)"

CREATE TABLE

"CREATE TABLE ""test.gt_iceberg"".gt_db2.tb05 (
name varchar,
salary integer
)
COMMENT ''
WITH (
partitioning = ARRAY['name']
)"

DROP TABLE

DROP TABLE

DROP TABLE

DROP TABLE

DROP TABLE

DROP SCHEMA
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,34 @@
*/
package com.datastrato.gravitino.trino.connector.catalog.iceberg;

import static com.datastrato.gravitino.trino.connector.catalog.iceberg.IcebergPropertyMeta.ICEBERG_PARTITIONING_PROPERTY;
import static com.datastrato.gravitino.trino.connector.catalog.iceberg.IcebergPropertyMeta.ICEBERG_SORTED_BY_PROPERTY;

import com.datastrato.catalog.property.PropertyConverter;
import com.datastrato.gravitino.dto.rel.partitions.Partitioning.SingleFieldPartitioning;
import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoColumn;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoTable;
import com.google.common.collect.ImmutableSet;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.session.PropertyMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;

/** Transforming gravitino Iceberg metadata to trino. */
public class IcebergMetadataAdapter extends CatalogConnectorMetadataAdapter {
Expand All @@ -17,6 +40,9 @@ public class IcebergMetadataAdapter extends CatalogConnectorMetadataAdapter {
private final PropertyConverter tableConverter;
private final PropertyConverter schemaConverter;

private static final Set<String> ICEBERG_PROPERTIES_TO_REMOVE =
ImmutableSet.of(ICEBERG_PARTITIONING_PROPERTY, ICEBERG_SORTED_BY_PROPERTY);

public IcebergMetadataAdapter(
List<PropertyMetadata<?>> schemaProperties,
List<PropertyMetadata<?>> tableProperties,
Expand Down Expand Up @@ -49,4 +75,103 @@ public Map<String, String> toGravitinoSchemaProperties(Map<String, Object> prope
Map<String, Object> stringMap = schemaConverter.engineToGravitinoProperties(properties);
return super.toGravitinoSchemaProperties(stringMap);
}

@Override
public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) {
String tableName = tableMetadata.getTableSchema().getTable().getTableName();
String schemaName = tableMetadata.getTableSchema().getTable().getSchemaName();
String comment = tableMetadata.getComment().orElse("");

Map<String, Object> propertyMap = tableMetadata.getProperties();
List<String> partitionColumns =
propertyMap.containsKey(ICEBERG_PARTITIONING_PROPERTY)
? (List<String>) propertyMap.get(ICEBERG_PARTITIONING_PROPERTY)
: Collections.EMPTY_LIST;

List<String> sortColumns =
propertyMap.containsKey(ICEBERG_SORTED_BY_PROPERTY)
? (List<String>) propertyMap.get(ICEBERG_SORTED_BY_PROPERTY)
: Collections.EMPTY_LIST;

Map<String, String> properties =
toGravitinoTableProperties(
removeKeys(tableMetadata.getProperties(), ICEBERG_PROPERTIES_TO_REMOVE));

List<GravitinoColumn> columns = new ArrayList<>();
for (int i = 0; i < tableMetadata.getColumns().size(); i++) {
ColumnMetadata column = tableMetadata.getColumns().get(i);
columns.add(
new GravitinoColumn(
column.getName(),
dataTypeTransformer.getGravitinoType(column.getType()),
i,
column.getComment(),
column.isNullable()));
}
GravitinoTable gravitinoTable =
new GravitinoTable(schemaName, tableName, columns, comment, properties);

if (!partitionColumns.isEmpty()) {
Transform[] partitioning =
partitionColumns.stream().map(Transforms::identity).toArray(Transform[]::new);
gravitinoTable.setPartitioning(partitioning);
}

if (!sortColumns.isEmpty()) {
SortOrder[] sorting =
sortColumns.stream()
.map(
sortingColumn -> {
Expression expression = NamedReference.field(sortingColumn);
return SortOrders.ascending(expression);
})
.toArray(SortOrder[]::new);
gravitinoTable.setSortOrders(sorting);
}

return gravitinoTable;
}

@Override
public ConnectorTableMetadata getTableMetadata(GravitinoTable gravitinoTable) {
SchemaTableName schemaTableName =
new SchemaTableName(gravitinoTable.getSchemaName(), gravitinoTable.getName());
ArrayList<ColumnMetadata> columnMetadataList = new ArrayList<>();
for (GravitinoColumn column : gravitinoTable.getColumns()) {
columnMetadataList.add(getColumnMetadata(column));
}

Map<String, Object> properties = toTrinoTableProperties(gravitinoTable.getProperties());

if (ArrayUtils.isNotEmpty(gravitinoTable.getPartitioning())) {
// Only support simple partition now like partition by a, b, c.
// Format like partition like partition by year(a), b, c is NOT supported now.
properties.put(
ICEBERG_PARTITIONING_PROPERTY,
gravitinoTable.getPartitioning().length > 0
? Arrays.stream(gravitinoTable.getPartitioning())
.map(ts -> ((SingleFieldPartitioning) ts).fieldName()[0])
.collect(Collectors.toList())
: Collections.EMPTY_LIST);
}

if (ArrayUtils.isNotEmpty(gravitinoTable.getSortOrders())) {
// Only support the simple format
properties.put(
ICEBERG_SORTED_BY_PROPERTY,
Arrays.stream(gravitinoTable.getSortOrders())
.map(
sortOrder -> {
Expression expression = sortOrder.expression();
return ((NamedReference) expression).fieldName()[0];
})
.collect(Collectors.toList()));
}

return new ConnectorTableMetadata(
schemaTableName,
columnMetadataList,
properties,
Optional.ofNullable(gravitinoTable.getComment()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,46 @@

package com.datastrato.gravitino.trino.connector.catalog.iceberg;

import static io.trino.spi.type.VarcharType.VARCHAR;

import com.datastrato.gravitino.trino.connector.catalog.HasPropertyMeta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;
import java.util.List;
import java.util.Map;

public class IcebergPropertyMeta implements HasPropertyMeta {

public static final String ICEBERG_PARTITIONING_PROPERTY = "partitioning";
public static final String ICEBERG_SORTED_BY_PROPERTY = "sorted_by";

// Value is whether this property is reserved and cannot be used by users
// TODO (yuqi) add more properties
public static final Map<PropertyMetadata<?>, Boolean> TABLE_PROPERTY_TO_RESERVED_MAP =
new ImmutableMap.Builder().build();

public static final List<PropertyMetadata<?>> TABLE_PROPERTY_META =
TABLE_PROPERTY_TO_RESERVED_MAP.entrySet().stream()
.map(Map.Entry::getKey)
.collect(ImmutableList.toImmutableList());
ImmutableList.of(
new PropertyMetadata<>(
ICEBERG_PARTITIONING_PROPERTY,
"Partition transforms",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value),
new PropertyMetadata<>(
ICEBERG_SORTED_BY_PROPERTY,
"Sorted columns",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value));

// TODO (yuqi) add more properties
public static final Map<PropertyMetadata<?>, Boolean> SCHEMA_PROPERTY_TO_RESERVED_MAP =
Expand Down

0 comments on commit 952afe4

Please sign in to comment.