Skip to content

Commit

Permalink
[apache#1460] feat(iceberg): Iceberg supports distributions.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Jan 18, 2024
1 parent 0e7f35e commit 73bff7f
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@ public class Distributions {

// NONE is used to indicate that there is no distribution.
public static final Distribution NONE =
new DistributionImpl(Strategy.NONE, 0, Expression.EMPTY_EXPRESSION);

// List bucketing strategy hash, TODO: #1505 Separate the bucket number from the Distribution.
public static final Distribution HASH =
new DistributionImpl(Strategy.HASH, 0, Expression.EMPTY_EXPRESSION);

// List bucketing strategy range, TODO: #1505 Separate the bucket number from the Distribution.
public static final Distribution RANGE =
new DistributionImpl(Strategy.RANGE, 0, Expression.EMPTY_EXPRESSION);

/**
* Create a distribution by evenly distributing the data across the number of buckets.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
* </ul>
*/
public enum Strategy {
/**
* No distribution strategy. This is the default strategy. Will depend on the allocation strategy
* of the underlying system.
*/
NONE,
HASH,
RANGE,
EVEN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.utils.MapUtils;
Expand Down Expand Up @@ -483,10 +482,6 @@ public Table createTable(
SortOrder[] sortOrders)
throws NoSuchSchemaException, TableAlreadyExistsException {
try {
if (!Distributions.NONE.equals(distribution)) {
throw new UnsupportedOperationException("Iceberg does not support distribution");
}

NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels());
if (!schemaExists(schemaIdent)) {
LOG.warn("Iceberg schema (database) does not exist: {}", schemaIdent);
Expand All @@ -512,7 +507,7 @@ public Table createTable(
.withPartitioning(partitioning)
.withSortOrders(sortOrders)
.withProperties(properties)
.withDistribution(Distributions.NONE)
.withDistribution(distribution)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg;

import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergSortOrder;
Expand All @@ -12,10 +14,17 @@
import com.datastrato.gravitino.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper;
import com.datastrato.gravitino.catalog.rel.BaseTable;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.Map;
import lombok.Getter;
import lombok.ToString;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.rest.requests.CreateTableRequest;
Expand Down Expand Up @@ -45,7 +54,8 @@ private IcebergTable() {}

public CreateTableRequest toCreateTableRequest() {
Schema schema = ConvertUtil.toIcebergSchema(this);

properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties);
convertDistribution();
Map<String, String> resultProperties =
Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties));
resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
Expand All @@ -60,6 +70,40 @@ public CreateTableRequest toCreateTableRequest() {
return builder.build();
}

/** Convert the distribution of Iceberg to the distribution of Gravitino. */
private void convertDistribution() {
if (null == distribution) {
properties.put(DISTRIBUTION_MODE, DistributionMode.NONE.modeName());
} else {
switch (distribution.strategy()) {
case HASH:
Preconditions.checkArgument(
ArrayUtils.isEmpty(distribution.expressions()),
"Iceberg's Distribution Mode.HASH does not support set expressions.");
Preconditions.checkArgument(
ArrayUtils.isNotEmpty(partitioning),
"Iceberg's Distribution Mode.HASH is distributed based on partition, but the partition is empty.");
properties.put(DISTRIBUTION_MODE, DistributionMode.HASH.modeName());
break;
case RANGE:
Preconditions.checkArgument(
ArrayUtils.isEmpty(distribution.expressions()),
"Iceberg's Distribution Mode.RANGE not support set expressions.");
Preconditions.checkArgument(
ArrayUtils.isNotEmpty(partitioning) || ArrayUtils.isNotEmpty(sortOrders),
"Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty.");
properties.put(DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
break;
case NONE:
properties.put(DISTRIBUTION_MODE, DistributionMode.NONE.modeName());
break;
default:
throw new IllegalArgumentException(
"Iceberg unsupported distribution strategy: " + distribution.strategy());
}
}
}

/**
* Creates a new IcebergTable instance from a Table and a Builder.
*
Expand All @@ -69,8 +113,24 @@ public CreateTableRequest toCreateTableRequest() {
*/
public static IcebergTable fromIcebergTable(TableMetadata table, String tableName) {
Map<String, String> properties = table.properties();

Schema schema = table.schema();
Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema);
SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder());
Distribution distribution = Distributions.NONE;
String distributionName = properties.get(DISTRIBUTION_MODE);
if (null != distributionName) {
switch (DistributionMode.fromName(distributionName)) {
case HASH:
distribution = Distributions.HASH;
break;
case RANGE:
distribution = Distributions.RANGE;
break;
default:
// do nothing
break;
}
}
IcebergColumn[] icebergColumns =
schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new);
return new IcebergTable.Builder()
Expand All @@ -80,8 +140,9 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
.withColumns(icebergColumns)
.withName(tableName)
.withAuditInfo(AuditInfo.EMPTY)
.withPartitioning(FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema))
.withSortOrders(FromIcebergSortOrder.fromSortOrder(table.sortOrder()))
.withPartitioning(partitionSpec)
.withSortOrders(sortOrder)
.withDistribution(distribution)
.build();
}

Expand Down Expand Up @@ -115,6 +176,7 @@ protected IcebergTable internalBuild() {
icebergTable.location = icebergTable.properties.get(PROP_LOCATION);
}
icebergTable.partitioning = partitioning;
icebergTable.distribution = distribution;
icebergTable.sortOrders = sortOrders;
if (null != comment) {
icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.TableProperties;

public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
public static final String COMMENT = "comment";
Expand All @@ -22,6 +23,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
public static final String SORT_ORDER = "sort-order";
public static final String IDENTIFIER_FIELDS = "identifier-fields";

public static final String DISTRIBUTION_MODE = TableProperties.WRITE_DISTRIBUTION_MODE;

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
Expand All @@ -41,7 +44,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
stringReservedPropertyEntry(
SORT_ORDER, "Selecting a specific snapshot in a merge operation", false),
stringReservedPropertyEntry(
IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false));
IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false),
stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Strategy;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Preconditions;
Expand All @@ -22,14 +23,16 @@
public class DistributionDTO implements Distribution {

public static final DistributionDTO NONE =
new Builder().withStrategy(Strategy.HASH).withNumber(0).withArgs(EMPTY_ARGS).build();
new Builder().withStrategy(Strategy.NONE).withNumber(0).withArgs(EMPTY_ARGS).build();

// Distribution strategy/method
private final Strategy strategy;

@JsonIgnoreProperties(ignoreUnknown = true)
// Number of buckets/distribution
private final int number;

@JsonIgnoreProperties(ignoreUnknown = true)
private final FunctionArg[] args;

private DistributionDTO(Strategy strategy, int number, FunctionArg[] args) {
Expand Down
44 changes: 33 additions & 11 deletions docs/lakehouse-iceberg-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,31 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the

### Table distributions

- Doesn't support `Distribution`, you should use `BucketPartition` instead.
- Gravitino used by default `NoneDistribution`. Hash distribute by partition key.
```json
{
"strategy": "hash"
}
```
```java
Distributions.hash();
```

- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder.
```json
{
"strategy": "range"
}
```
```java
Distributions.range();
```

:::info
If you load Iceberg tables, the table distribution strategy is `hash` with num 0, which means no distribution.
Iceberg automatically calculates the bucket value and performs hashing based on the partition field, or automatically calculates ranges according to SortOrder.
:::
:::info
Apache Iceberg doesn't support Gravitino `EvenDistribution` type.
:::

### Table column types
Expand Down Expand Up @@ -149,15 +170,16 @@ You can pass [Iceberg table properties](https://iceberg.apache.org/docs/1.3.1/co

The Gravitino server doesn't allow passing the following reserved fields.

| Configuration item | Description |
|---------------------------|---------------------------------------------------------|
| `comment` | The table comment. |
| `creator` | The table creator. |
| `location` | Iceberg location for table storage. |
| `current-snapshot-id` | The snapshot represents the current state of the table. |
| `cherry-pick-snapshot-id` | Selecting a specific snapshot in a merge operation. |
| `sort-order` | Selecting a specific snapshot in a merge operation. |
| `identifier-fields` | The identifier fields for defining the table. |
| Configuration item | Description |
|---------------------------------|---------------------------------------------------------|
| `comment` | The table comment. |
| `creator` | The table creator. |
| `location` | Iceberg location for table storage. |
| `current-snapshot-id` | The snapshot represents the current state of the table. |
| `cherry-pick-snapshot-id` | Selecting a specific snapshot in a merge operation. |
| `sort-order` | Selecting a specific snapshot in a merge operation. |
| `identifier-fields` | The identifier fields for defining the table. |
| `write.distribution-mode` | Defines distribution of write data |

### Table operations

Expand Down
Loading

0 comments on commit 73bff7f

Please sign in to comment.