Skip to content

Commit

Permalink
[apache#1460] feat(iceberg): Iceberg supports distributions.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Jan 12, 2024
1 parent 0b5a8cc commit 6c701cd
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ public static Distribution even(int number, Expression... expressions) {
return new DistributionImpl(Strategy.EVEN, number, expressions);
}

/**
* Create a distribution by hashing the data.
*
* @return The created hash distribution
*/
public static Distribution hash(Expression... expressions) {
return hash(0, expressions);
}

/**
* Create a distribution by hashing the data across the number of buckets.
*
Expand All @@ -37,6 +46,27 @@ public static Distribution hash(int number, Expression... expressions) {
return new DistributionImpl(Strategy.HASH, number, expressions);
}

/**
* Create a distribution by range the data.
*
* @param expressions The expressions to distribute by
* @return The created range distribution
*/
public static Distribution range(Expression... expressions) {
return range(0, expressions);
}

/**
* Create a distribution by range the data across the number of buckets.
*
* @param number The number of buckets
* @param expressions The expressions to distribute by
* @return The created range distribution
*/
public static Distribution range(int number, Expression... expressions) {
return new DistributionImpl(Strategy.RANGE, number, expressions);
}

/**
* Create a distribution by the given strategy.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.utils.MapUtils;
Expand Down Expand Up @@ -483,10 +482,6 @@ public Table createTable(
SortOrder[] sortOrders)
throws NoSuchSchemaException, TableAlreadyExistsException {
try {
if (!Distributions.NONE.equals(distribution)) {
throw new UnsupportedOperationException("Iceberg does not support distribution");
}

NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels());
if (!schemaExists(schemaIdent)) {
LOG.warn("Iceberg schema (database) does not exist: {}", schemaIdent);
Expand All @@ -512,7 +507,7 @@ public Table createTable(
.withPartitioning(partitioning)
.withSortOrders(sortOrders)
.withProperties(properties)
.withDistribution(Distributions.NONE)
.withDistribution(distribution)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg;

import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergSortOrder;
Expand All @@ -12,10 +14,17 @@
import com.datastrato.gravitino.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper;
import com.datastrato.gravitino.catalog.rel.BaseTable;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.Map;
import lombok.Getter;
import lombok.ToString;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.rest.requests.CreateTableRequest;
Expand Down Expand Up @@ -46,6 +55,35 @@ private IcebergTable() {}
public CreateTableRequest toCreateTableRequest() {
Schema schema = ConvertUtil.toIcebergSchema(this);

properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties);
if (null == distribution) {
properties.put(DISTRIBUTION_MODE, DistributionMode.NONE.modeName());
} else {
switch (distribution.strategy()) {
case HASH:
Preconditions.checkArgument(
ArrayUtils.isEmpty(distribution.expressions()),
"Iceberg's Distribution Mode.HASH not support set expressions.");
Preconditions.checkArgument(
ArrayUtils.isNotEmpty(partitioning),
"Iceberg's Distribution Mode.HASH is distributed based on partition, but the partition is empty.");
properties.put(DISTRIBUTION_MODE, DistributionMode.HASH.modeName());
break;
case RANGE:
Preconditions.checkArgument(
ArrayUtils.isEmpty(distribution.expressions()),
"Iceberg's Distribution Mode.RANGE not support set expressions.");
Preconditions.checkArgument(
ArrayUtils.isNotEmpty(partitioning) && ArrayUtils.isNotEmpty(sortOrders),
"Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty.");
properties.put(DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
break;
case EVEN:
default:
throw new IllegalArgumentException(
"Iceberg unsupported distribution strategy: " + distribution.strategy());
}
}
Map<String, String> resultProperties =
Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties));
resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
Expand All @@ -69,8 +107,30 @@ public CreateTableRequest toCreateTableRequest() {
*/
public static IcebergTable fromIcebergTable(TableMetadata table, String tableName) {
Map<String, String> properties = table.properties();

Schema schema = table.schema();
Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema);
SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder());
Distribution distribution = Distributions.NONE;
String distributionName = properties.get(DISTRIBUTION_MODE);
if (null != distributionName) {
switch (DistributionMode.fromName(distributionName)) {
case HASH:
Preconditions.checkArgument(
ArrayUtils.isNotEmpty(partitionSpec),
"Iceberg's Distribution Mode.HASH is distributed based on partition, but the partition is empty.");
distribution = Distributions.hash();
break;
case RANGE:
Preconditions.checkArgument(
ArrayUtils.isNotEmpty(partitionSpec) && ArrayUtils.isNotEmpty(sortOrder),
"Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty.");
distribution = Distributions.range();
break;
case NONE:
// do nothing
break;
}
}
IcebergColumn[] icebergColumns =
schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new);
return new IcebergTable.Builder()
Expand All @@ -80,8 +140,9 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
.withColumns(icebergColumns)
.withName(tableName)
.withAuditInfo(AuditInfo.EMPTY)
.withPartitioning(FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema))
.withSortOrders(FromIcebergSortOrder.fromSortOrder(table.sortOrder()))
.withPartitioning(partitionSpec)
.withSortOrders(sortOrder)
.withDistribution(distribution)
.build();
}

Expand Down Expand Up @@ -115,6 +176,7 @@ protected IcebergTable internalBuild() {
icebergTable.location = icebergTable.properties.get(PROP_LOCATION);
}
icebergTable.partitioning = partitioning;
icebergTable.distribution = distribution;
icebergTable.sortOrders = sortOrders;
if (null != comment) {
icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.TableProperties;

public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
public static final String COMMENT = "comment";
Expand All @@ -22,6 +23,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
public static final String SORT_ORDER = "sort-order";
public static final String IDENTIFIER_FIELDS = "identifier-fields";

public static final String DISTRIBUTION_MODE = TableProperties.WRITE_DISTRIBUTION_MODE;

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
Expand All @@ -41,7 +44,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
stringReservedPropertyEntry(
SORT_ORDER, "Selecting a specific snapshot in a merge operation", false),
stringReservedPropertyEntry(
IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false));
IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false),
stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Strategy;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Preconditions;
Expand All @@ -27,9 +28,11 @@ public class DistributionDTO implements Distribution {
// Distribution strategy/method
private final Strategy strategy;

@JsonIgnoreProperties(ignoreUnknown = true)
// Number of buckets/distribution
private final int number;

@JsonIgnoreProperties(ignoreUnknown = true)
private final FunctionArg[] args;

private DistributionDTO(Strategy strategy, int number, FunctionArg[] args) {
Expand Down
22 changes: 20 additions & 2 deletions docs/lakehouse-iceberg-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,28 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the

### Table distributions

- Doesn't support `Distribution`, you should use `BucketPartition` instead.
- Gravitino used by default `HashDistribution`. Hash distribute by partition key.
```json
{
"strategy": "hash"
}
```
```java
Distributions.hash();
```

- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder.
```json
{
"strategy": "range"
}
```
```java
Distributions.range();
```

:::info
If you load Iceberg tables, the table distribution strategy is `hash` with num 0, which means no distribution.
Iceberg automatically calculates the bucket value and performs hashing based on the partition field, or automatically calculates ranges according to SortOrder.
:::

### Table column types
Expand Down
Loading

0 comments on commit 6c701cd

Please sign in to comment.