Skip to content

Commit

Permalink
Convert DeltaLakeColumnHandle to record
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Aug 13, 2024
1 parent 43f3979 commit b033bf9
Show file tree
Hide file tree
Showing 32 changed files with 169 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,25 +154,25 @@ public AbstractDeltaLakePageSink(
}
for (int inputIndex = 0; inputIndex < inputColumns.size(); inputIndex++) {
DeltaLakeColumnHandle column = inputColumns.get(inputIndex);
switch (column.getColumnType()) {
switch (column.columnType()) {
case PARTITION_KEY:
int partitionPosition = toOriginalPartitionPositions.get(column.getColumnName());
int partitionPosition = toOriginalPartitionPositions.get(column.columnName());
partitionColumnInputIndex[partitionPosition] = inputIndex;
originalPartitionColumnNames[partitionPosition] = column.getColumnName();
partitionColumnTypes[partitionPosition] = column.getBaseType();
originalPartitionColumnNames[partitionPosition] = column.columnName();
partitionColumnTypes[partitionPosition] = column.baseType();
break;
case REGULAR:
verify(column.isBaseColumn(), "Unexpected dereference: %s", column);
dataColumnHandles.add(column);
dataColumnsInputIndex.add(inputIndex);
dataColumnNames.add(column.getBasePhysicalColumnName());
dataColumnTypes.add(column.getBasePhysicalType());
dataColumnNames.add(column.basePhysicalColumnName());
dataColumnTypes.add(column.basePhysicalType());
break;
case SYNTHESIZED:
processSynthesizedColumn(column);
break;
default:
throw new IllegalStateException("Unexpected column type: " + column.getColumnType());
throw new IllegalStateException("Unexpected column type: " + column.columnType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public DeltaLakeBucketFunction(TypeOperators typeOperators, List<DeltaLakeColumn
{
this.hashCodeInvokers = partitioningColumns.stream()
.peek(column -> verify(column.isBaseColumn(), "Unexpected dereference: %s", column))
.map(DeltaLakeColumnHandle::getBaseType)
.map(DeltaLakeColumnHandle::baseType)
.map(type -> typeOperators.getHashCodeOperator(type, simpleConvention(FAIL_ON_NULL, BLOCK_POSITION_NOT_NULL)))
.collect(toImmutableList());
this.bucketCount = bucketCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@
*/
package io.trino.plugin.deltalake;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;

import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;

Expand All @@ -39,7 +36,18 @@
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class DeltaLakeColumnHandle
/**
* @param basePhysicalColumnName Hold field names in Parquet files The value is same as 'name' when the column mapping mode is none The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. col-6707cc9e-f3aa-4e6b-b8ef-1b03d3475680
* @param basePhysicalType Hold type in Parquet files The value is same as 'type' when the column mapping mode is none The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. row(col-5924c8b3-04cf-4146-abb5-2c229e7ff708 integer)
*/
public record DeltaLakeColumnHandle(
String baseColumnName,
Type baseType,
OptionalInt baseFieldId,
String basePhysicalColumnName,
Type basePhysicalType,
DeltaLakeColumnType columnType,
Optional<DeltaLakeColumnProjectionInfo> projectionInfo)
implements ColumnHandle
{
private static final int INSTANCE_SIZE = instanceSize(DeltaLakeColumnHandle.class);
Expand All @@ -61,116 +69,32 @@ public class DeltaLakeColumnHandle
public static final String FILE_MODIFIED_TIME_COLUMN_NAME = "$file_modified_time";
public static final Type FILE_MODIFIED_TIME_TYPE = TIMESTAMP_TZ_MILLIS;

private final String baseColumnName;
private final Type baseType;
private final OptionalInt baseFieldId;
// Hold field names in Parquet files
// The value is same as 'name' when the column mapping mode is none
// The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. col-6707cc9e-f3aa-4e6b-b8ef-1b03d3475680
private final String basePhysicalColumnName;
// Hold type in Parquet files
// The value is same as 'type' when the column mapping mode is none
// The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. row(col-5924c8b3-04cf-4146-abb5-2c229e7ff708 integer)
private final Type basePhysicalType;
private final DeltaLakeColumnType columnType;
private final Optional<DeltaLakeColumnProjectionInfo> projectionInfo;

@JsonCreator
public DeltaLakeColumnHandle(
@JsonProperty("baseColumnName") String baseColumnName,
@JsonProperty("baseType") Type baseType,
@JsonProperty("baseFieldId") OptionalInt baseFieldId,
@JsonProperty("basePhysicalColumnName") String basePhysicalColumnName,
@JsonProperty("basePhysicalType") Type basePhysicalType,
@JsonProperty("columnType") DeltaLakeColumnType columnType,
@JsonProperty("projectionInfo") Optional<DeltaLakeColumnProjectionInfo> projectionInfo)
public DeltaLakeColumnHandle
{
this.baseColumnName = requireNonNull(baseColumnName, "baseColumnName is null");
this.baseType = requireNonNull(baseType, "baseType is null");
this.baseFieldId = requireNonNull(baseFieldId, "baseFieldId is null");
this.basePhysicalColumnName = requireNonNull(basePhysicalColumnName, "basePhysicalColumnName is null");
this.basePhysicalType = requireNonNull(basePhysicalType, "basePhysicalType is null");
this.columnType = requireNonNull(columnType, "columnType is null");
requireNonNull(baseColumnName, "baseColumnName is null");
requireNonNull(baseType, "baseType is null");
requireNonNull(baseFieldId, "baseFieldId is null");
requireNonNull(basePhysicalColumnName, "basePhysicalColumnName is null");
requireNonNull(basePhysicalType, "basePhysicalType is null");
requireNonNull(columnType, "columnType is null");
checkArgument(projectionInfo.isEmpty() || columnType == REGULAR, "Projection info present for column type: %s", columnType);
this.projectionInfo = projectionInfo;
}

@JsonProperty
public String getBaseColumnName()
{
return baseColumnName;
}

@JsonProperty
public Type getBaseType()
{
return baseType;
}

@JsonProperty
public OptionalInt getBaseFieldId()
{
return baseFieldId;
}

@JsonProperty
public String getBasePhysicalColumnName()
{
return basePhysicalColumnName;
}

@JsonProperty
public Type getBasePhysicalType()
{
return basePhysicalType;
}

@JsonProperty
public DeltaLakeColumnType getColumnType()
{
return columnType;
}

@JsonProperty
public Optional<DeltaLakeColumnProjectionInfo> getProjectionInfo()
{
return projectionInfo;
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DeltaLakeColumnHandle other = (DeltaLakeColumnHandle) obj;
return Objects.equals(this.baseColumnName, other.baseColumnName) &&
Objects.equals(this.baseType, other.baseType) &&
Objects.equals(this.baseFieldId, other.baseFieldId) &&
Objects.equals(this.basePhysicalColumnName, other.basePhysicalColumnName) &&
Objects.equals(this.basePhysicalType, other.basePhysicalType) &&
this.columnType == other.columnType &&
Objects.equals(this.projectionInfo, other.projectionInfo);
}

@JsonIgnore
public String getColumnName()
public String columnName()
{
checkState(isBaseColumn(), "Unexpected dereference: %s", this);
return baseColumnName;
}

@JsonIgnore
public String getQualifiedPhysicalName()
public String qualifiedPhysicalName()
{
return projectionInfo.map(projectionInfo -> basePhysicalColumnName + "#" + projectionInfo.getPartialName())
.orElse(basePhysicalColumnName);
}

public long getRetainedSizeInBytes()
public long retainedSizeInBytes()
{
// type is not accounted for as the instances are cached (by TypeRegistry) and shared
return INSTANCE_SIZE
Expand All @@ -187,22 +111,16 @@ public boolean isBaseColumn()
}

@JsonIgnore
public Type getType()
public Type type()
{
return projectionInfo.map(DeltaLakeColumnProjectionInfo::getType)
.orElse(baseType);
}

@Override
public int hashCode()
{
return Objects.hash(baseColumnName, baseType, baseFieldId, basePhysicalColumnName, basePhysicalType, columnType, projectionInfo);
}

@Override
public String toString()
{
return getQualifiedPhysicalName() +
return qualifiedPhysicalName() +
":" + projectionInfo.map(DeltaLakeColumnProjectionInfo::getType).orElse(baseType).getDisplayName() +
":" + columnType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ public DeltaLakeMergeSink(
requireNonNull(tableColumns, "tableColumns is null");
this.tableColumnCount = tableColumns.size();
this.dataColumns = tableColumns.stream()
.filter(column -> column.getColumnType() == REGULAR)
.filter(column -> column.columnType() == REGULAR)
.collect(toImmutableList());
this.domainCompactionThreshold = domainCompactionThreshold;
this.nonSynthesizedColumns = tableColumns.stream()
.filter(column -> column.getColumnType() != SYNTHESIZED)
.filter(column -> column.columnType() != SYNTHESIZED)
.collect(toImmutableList());
this.cdfPageSinkSupplier = requireNonNull(cdfPageSinkSupplier);
this.cdfEnabled = cdfEnabled;
Expand Down Expand Up @@ -370,10 +370,10 @@ private ParquetFileWriter createParquetFileWriter(Location path, List<DeltaLakeC
dataColumns.forEach(column -> verify(column.isBaseColumn(), "Unexpected dereference: %s", column));

List<Type> parquetTypes = dataColumns.stream()
.map(column -> toParquetType(typeOperators, column.getBasePhysicalType()))
.map(column -> toParquetType(typeOperators, column.basePhysicalType()))
.collect(toImmutableList());
List<String> dataColumnNames = dataColumns.stream()
.map(DeltaLakeColumnHandle::getBasePhysicalColumnName)
.map(DeltaLakeColumnHandle::basePhysicalColumnName)
.collect(toImmutableList());

return new ParquetFileWriter(
Expand Down Expand Up @@ -475,13 +475,13 @@ private void storeCdfEntries(Page page, int[] deleted, int deletedCount, FileDel
int partitionIndex = 0;
List<String> partitionValues = deletion.partitionValues;
for (int i = 0; i < nonSynthesizedColumns.size(); i++) {
if (nonSynthesizedColumns.get(i).getColumnType() == REGULAR) {
if (nonSynthesizedColumns.get(i).columnType() == REGULAR) {
outputBlocks[i] = cdfPage.getBlock(cdfPageIndex);
cdfPageIndex++;
}
else {
outputBlocks[i] = RunLengthEncodedBlock.create(nativeValueToBlock(
nonSynthesizedColumns.get(i).getBaseType(),
nonSynthesizedColumns.get(i).baseType(),
deserializePartitionValue(
nonSynthesizedColumns.get(i),
Optional.ofNullable(partitionValues.get(partitionIndex)))),
Expand Down
Loading

0 comments on commit b033bf9

Please sign in to comment.