From 73bff7f3312bd20e858eef32ad270e96715898d6 Mon Sep 17 00:00:00 2001 From: Clearvive Date: Fri, 12 Jan 2024 15:54:17 +0800 Subject: [PATCH] [#1460] feat(iceberg): Iceberg supports distributions. --- .../distributions/Distributions.java | 8 + .../expressions/distributions/Strategy.java | 5 + .../iceberg/IcebergCatalogOperations.java | 7 +- .../lakehouse/iceberg/IcebergTable.java | 70 +++++++- .../IcebergTablePropertiesMetadata.java | 6 +- .../gravitino/dto/rel/DistributionDTO.java | 5 +- docs/lakehouse-iceberg-catalog.md | 44 +++-- .../lakehouse/iceberg/CatalogIcebergIT.java | 164 ++++++++++++++++++ 8 files changed, 286 insertions(+), 23 deletions(-) diff --git a/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Distributions.java b/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Distributions.java index 091c75ee580..c701dbb83cf 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Distributions.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Distributions.java @@ -13,8 +13,16 @@ public class Distributions { // NONE is used to indicate that there is no distribution. public static final Distribution NONE = + new DistributionImpl(Strategy.NONE, 0, Expression.EMPTY_EXPRESSION); + + // List bucketing strategy hash, TODO: #1505 Separate the bucket number from the Distribution. + public static final Distribution HASH = new DistributionImpl(Strategy.HASH, 0, Expression.EMPTY_EXPRESSION); + // List bucketing strategy range, TODO: #1505 Separate the bucket number from the Distribution. + public static final Distribution RANGE = + new DistributionImpl(Strategy.RANGE, 0, Expression.EMPTY_EXPRESSION); + /** * Create a distribution by evenly distributing the data across the number of buckets. * diff --git a/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Strategy.java b/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Strategy.java index 5928c096142..73dff0ffbb0 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Strategy.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Strategy.java @@ -18,6 +18,11 @@ * */ public enum Strategy { + /** + * No distribution strategy. This is the default strategy. Will depend on the allocation strategy + * of the underlying system. + */ + NONE, HASH, RANGE, EVEN; diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 4b6cdaf9074..566c4b04355 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -27,7 +27,6 @@ import com.datastrato.gravitino.rel.TableCatalog; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; -import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.utils.MapUtils; @@ -483,10 +482,6 @@ public Table createTable( SortOrder[] sortOrders) throws NoSuchSchemaException, TableAlreadyExistsException { try { - if (!Distributions.NONE.equals(distribution)) { - throw new UnsupportedOperationException("Iceberg does not support distribution"); - } - NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels()); if (!schemaExists(schemaIdent)) { LOG.warn("Iceberg schema (database) does not exist: {}", schemaIdent); @@ -512,7 +507,7 @@ public Table createTable( .withPartitioning(partitioning) .withSortOrders(sortOrders) .withProperties(properties) - .withDistribution(Distributions.NONE) + .withDistribution(distribution) .withAuditInfo( new AuditInfo.Builder() .withCreator(currentUser()) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 6ed6de81731..f38f6214f67 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE; + import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergSortOrder; @@ -12,10 +14,17 @@ import com.datastrato.gravitino.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper; import com.datastrato.gravitino.catalog.rel.BaseTable; import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.Map; import lombok.Getter; import lombok.ToString; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -45,7 +54,8 @@ private IcebergTable() {} public CreateTableRequest toCreateTableRequest() { Schema schema = ConvertUtil.toIcebergSchema(this); - + properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties); + convertDistribution(); Map resultProperties = Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties)); resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); @@ -60,6 +70,40 @@ public CreateTableRequest toCreateTableRequest() { return builder.build(); } + /** Convert the distribution of Iceberg to the distribution of Gravitino. */ + private void convertDistribution() { + if (null == distribution) { + properties.put(DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + } else { + switch (distribution.strategy()) { + case HASH: + Preconditions.checkArgument( + ArrayUtils.isEmpty(distribution.expressions()), + "Iceberg's Distribution Mode.HASH does not support set expressions."); + Preconditions.checkArgument( + ArrayUtils.isNotEmpty(partitioning), + "Iceberg's Distribution Mode.HASH is distributed based on partition, but the partition is empty."); + properties.put(DISTRIBUTION_MODE, DistributionMode.HASH.modeName()); + break; + case RANGE: + Preconditions.checkArgument( + ArrayUtils.isEmpty(distribution.expressions()), + "Iceberg's Distribution Mode.RANGE not support set expressions."); + Preconditions.checkArgument( + ArrayUtils.isNotEmpty(partitioning) || ArrayUtils.isNotEmpty(sortOrders), + "Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty."); + properties.put(DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()); + break; + case NONE: + properties.put(DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + break; + default: + throw new IllegalArgumentException( + "Iceberg unsupported distribution strategy: " + distribution.strategy()); + } + } + } + /** * Creates a new IcebergTable instance from a Table and a Builder. * @@ -69,8 +113,24 @@ public CreateTableRequest toCreateTableRequest() { */ public static IcebergTable fromIcebergTable(TableMetadata table, String tableName) { Map properties = table.properties(); - Schema schema = table.schema(); + Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema); + SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder()); + Distribution distribution = Distributions.NONE; + String distributionName = properties.get(DISTRIBUTION_MODE); + if (null != distributionName) { + switch (DistributionMode.fromName(distributionName)) { + case HASH: + distribution = Distributions.HASH; + break; + case RANGE: + distribution = Distributions.RANGE; + break; + default: + // do nothing + break; + } + } IcebergColumn[] icebergColumns = schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new); return new IcebergTable.Builder() @@ -80,8 +140,9 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam .withColumns(icebergColumns) .withName(tableName) .withAuditInfo(AuditInfo.EMPTY) - .withPartitioning(FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema)) - .withSortOrders(FromIcebergSortOrder.fromSortOrder(table.sortOrder())) + .withPartitioning(partitionSpec) + .withSortOrders(sortOrder) + .withDistribution(distribution) .build(); } @@ -115,6 +176,7 @@ protected IcebergTable internalBuild() { icebergTable.location = icebergTable.properties.get(PROP_LOCATION); } icebergTable.partitioning = partitioning; + icebergTable.distribution = distribution; icebergTable.sortOrders = sortOrders; if (null != comment) { icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java index c227f80d64f..9b3e5314d45 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java @@ -12,6 +12,7 @@ import com.google.common.collect.Maps; import java.util.List; import java.util.Map; +import org.apache.iceberg.TableProperties; public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { public static final String COMMENT = "comment"; @@ -22,6 +23,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { public static final String SORT_ORDER = "sort-order"; public static final String IDENTIFIER_FIELDS = "identifier-fields"; + public static final String DISTRIBUTION_MODE = TableProperties.WRITE_DISTRIBUTION_MODE; + private static final Map> PROPERTIES_METADATA; static { @@ -41,7 +44,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { stringReservedPropertyEntry( SORT_ORDER, "Selecting a specific snapshot in a merge operation", false), stringReservedPropertyEntry( - IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false)); + IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false), + stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false)); PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); } diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/DistributionDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/DistributionDTO.java index 58d550e1ddf..795f702e714 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/rel/DistributionDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/DistributionDTO.java @@ -12,6 +12,7 @@ import com.datastrato.gravitino.rel.expressions.Expression; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.distributions.Strategy; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.google.common.base.Preconditions; @@ -22,14 +23,16 @@ public class DistributionDTO implements Distribution { public static final DistributionDTO NONE = - new Builder().withStrategy(Strategy.HASH).withNumber(0).withArgs(EMPTY_ARGS).build(); + new Builder().withStrategy(Strategy.NONE).withNumber(0).withArgs(EMPTY_ARGS).build(); // Distribution strategy/method private final Strategy strategy; + @JsonIgnoreProperties(ignoreUnknown = true) // Number of buckets/distribution private final int number; + @JsonIgnoreProperties(ignoreUnknown = true) private final FunctionArg[] args; private DistributionDTO(Strategy strategy, int number, FunctionArg[] args) { diff --git a/docs/lakehouse-iceberg-catalog.md b/docs/lakehouse-iceberg-catalog.md index a69e7abc623..72a6f6691e4 100644 --- a/docs/lakehouse-iceberg-catalog.md +++ b/docs/lakehouse-iceberg-catalog.md @@ -111,10 +111,31 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the ### Table distributions -- Doesn't support `Distribution`, you should use `BucketPartition` instead. +- Gravitino used by default `NoneDistribution`. Hash distribute by partition key. +```json +{ + "strategy": "hash" +} +``` +```java +Distributions.hash(); +``` + +- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder. +```json +{ + "strategy": "range" +} +``` +```java +Distributions.range(); +``` :::info -If you load Iceberg tables, the table distribution strategy is `hash` with num 0, which means no distribution. +Iceberg automatically calculates the bucket value and performs hashing based on the partition field, or automatically calculates ranges according to SortOrder. +::: +:::info +Apache Iceberg doesn't support Gravitino `EvenDistribution` type. ::: ### Table column types @@ -149,15 +170,16 @@ You can pass [Iceberg table properties](https://iceberg.apache.org/docs/1.3.1/co The Gravitino server doesn't allow passing the following reserved fields. -| Configuration item | Description | -|---------------------------|---------------------------------------------------------| -| `comment` | The table comment. | -| `creator` | The table creator. | -| `location` | Iceberg location for table storage. | -| `current-snapshot-id` | The snapshot represents the current state of the table. | -| `cherry-pick-snapshot-id` | Selecting a specific snapshot in a merge operation. | -| `sort-order` | Selecting a specific snapshot in a merge operation. | -| `identifier-fields` | The identifier fields for defining the table. | +| Configuration item | Description | +|---------------------------------|---------------------------------------------------------| +| `comment` | The table comment. | +| `creator` | The table creator. | +| `location` | Iceberg location for table storage. | +| `current-snapshot-id` | The snapshot represents the current state of the table. | +| `cherry-pick-snapshot-id` | Selecting a specific snapshot in a merge operation. | +| `sort-order` | Selecting a specific snapshot in a merge operation. | +| `identifier-fields` | The identifier fields for defining the table. | +| `write.distribution-mode` | Defines distribution of write data | ### Table operations diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java index de771e688d4..1fa6a06ee65 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java @@ -18,6 +18,7 @@ import com.datastrato.gravitino.client.GravitinoMetaLake; import com.datastrato.gravitino.dto.rel.ColumnDTO; import com.datastrato.gravitino.dto.rel.partitions.Partitioning; +import com.datastrato.gravitino.dto.util.DTOConverters; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; @@ -35,6 +36,7 @@ import com.datastrato.gravitino.rel.expressions.NamedReference; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.distributions.Strategy; import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering; import com.datastrato.gravitino.rel.expressions.sorts.SortDirection; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; @@ -53,6 +55,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionField; @@ -808,4 +811,165 @@ public void testOperatorSchemeProperties() { Assertions.assertThrows( NoSuchSchemaException.class, () -> catalog.asSchemas().loadSchema(ident)); } + + @Test + public void testTableDistribution() { + ColumnDTO[] columns = createColumns(); + + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + Distribution distribution = Distributions.NONE; + + final SortOrder[] sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field(ICEBERG_COL_NAME2), + SortDirection.DESCENDING, + NullOrdering.NULLS_FIRST) + }; + + Partitioning[] partitioning = new Partitioning[] {DayPartitioningDTO.of(columns[1].name())}; + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + // Create a data table for Distributions.NONE + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + + // check table + assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + distribution, + sortOrders, + partitioning, + loadTable); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); + + distribution = Distributions.HASH; + // Create a data table for Distributions.hash + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + + loadTable = tableCatalog.loadTable(tableIdentifier); + // check table + assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + distribution, + sortOrders, + partitioning, + loadTable); + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); + + // Create a data table for Distributions.NONE and set field name + IllegalArgumentException illegalArgumentException = + assertThrows( + IllegalArgumentException.class, + () -> { + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + Distributions.hash(0, NamedReference.field(ICEBERG_COL_NAME1)), + sortOrders); + }); + Assertions.assertTrue( + StringUtils.contains( + illegalArgumentException.getMessage(), + "Iceberg's Distribution Mode.HASH does not support set expressions.")); + + distribution = Distributions.RANGE; + // Create a data table for Distributions.hash + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + + loadTable = tableCatalog.loadTable(tableIdentifier); + // check table + assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + distribution, + sortOrders, + partitioning, + loadTable); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); + + // Create a data table for Distributions.range and set field name + illegalArgumentException = + assertThrows( + IllegalArgumentException.class, + () -> { + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + Distributions.of(Strategy.RANGE, 0, NamedReference.field(ICEBERG_COL_NAME1)), + sortOrders); + }); + Assertions.assertTrue( + StringUtils.contains( + illegalArgumentException.getMessage(), + "Iceberg's Distribution Mode.RANGE not support set expressions.")); + } + + protected static void assertionsTableInfo( + String tableName, + String tableComment, + List columns, + Map properties, + Distribution distribution, + SortOrder[] sortOrder, + Partitioning[] partitioning, + Table table) { + Assertions.assertEquals(tableName, table.name()); + Assertions.assertEquals(tableComment, table.comment()); + Assertions.assertEquals(columns.size(), table.columns().length); + Assertions.assertEquals(DTOConverters.toDTO(distribution), table.distribution()); + Assertions.assertArrayEquals(DTOConverters.toDTOs(sortOrder), table.sortOrder()); + Assertions.assertArrayEquals(DTOConverters.toDTOs(partitioning), table.partitioning()); + for (int i = 0; i < columns.size(); i++) { + Assertions.assertEquals(columns.get(i).name(), table.columns()[i].name()); + Assertions.assertEquals(columns.get(i).dataType(), table.columns()[i].dataType()); + Assertions.assertEquals(columns.get(i).nullable(), table.columns()[i].nullable()); + Assertions.assertEquals(columns.get(i).comment(), table.columns()[i].comment()); + Assertions.assertEquals(columns.get(i).autoIncrement(), table.columns()[i].autoIncrement()); + } + + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertEquals(entry.getValue(), table.properties().get(entry.getKey())); + } + } }