Skip to content

Commit

Permalink
[apache#1460] feat(iceberg): Update deafult distribution to none.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Jan 16, 2024
1 parent 32e3919 commit d1e43b6
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
* </ul>
*/
public enum Strategy {
/**
* No distribution strategy. This is the default strategy. Will depend on the allocation strategy
* of the underlying system.
*/
NONE,
HASH,
RANGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,32 @@ private IcebergTable() {}

public CreateTableRequest toCreateTableRequest() {
Schema schema = ConvertUtil.toIcebergSchema(this);

properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties);
convertDistribution();
Map<String, String> resultProperties =
Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties));
resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
CreateTableRequest.Builder builder =
CreateTableRequest.builder()
.withName(name)
.withLocation(location)
.withSchema(schema)
.setProperties(resultProperties)
.withPartitionSpec(ToIcebergPartitionSpec.toPartitionSpec(schema, partitioning))
.withWriteOrder(ToIcebergSortOrder.toSortOrder(schema, sortOrders));
return builder.build();
}

/** Convert the distribution of Iceberg to the distribution of Gravitino. */
private void convertDistribution() {
if (null == distribution) {
properties.put(DISTRIBUTION_MODE, DistributionMode.NONE.modeName());
} else {
switch (distribution.strategy()) {
case HASH:
Preconditions.checkArgument(
ArrayUtils.isEmpty(distribution.expressions()),
"Iceberg's Distribution Mode.HASH not support set expressions.");
"Iceberg's Distribution Mode.HASH does not support set expressions.");
Preconditions.checkArgument(
ArrayUtils.isNotEmpty(partitioning),
"Iceberg's Distribution Mode.HASH is distributed based on partition, but the partition is empty.");
Expand All @@ -81,24 +97,11 @@ public CreateTableRequest toCreateTableRequest() {
case NONE:
properties.put(DISTRIBUTION_MODE, DistributionMode.NONE.modeName());
break;
case EVEN:
default:
throw new IllegalArgumentException(
"Iceberg unsupported distribution strategy: " + distribution.strategy());
}
}
Map<String, String> resultProperties =
Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties));
resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
CreateTableRequest.Builder builder =
CreateTableRequest.builder()
.withName(name)
.withLocation(location)
.withSchema(schema)
.setProperties(resultProperties)
.withPartitionSpec(ToIcebergPartitionSpec.toPartitionSpec(schema, partitioning))
.withWriteOrder(ToIcebergSortOrder.toSortOrder(schema, sortOrders));
return builder.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ public void testTableDistribution() {
Assertions.assertTrue(
StringUtils.contains(
illegalArgumentException.getMessage(),
"Iceberg's Distribution Mode.HASH not support set expressions."));
"Iceberg's Distribution Mode.HASH does not support set expressions."));

distribution = Distributions.range();
// Create a data table for Distributions.hash
Expand Down

0 comments on commit d1e43b6

Please sign in to comment.