Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1748333 Fix Iceberg decimal type schema parser #864

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
public void setupSchema(List<ColumnMetadata> columns) {
fieldIndex.clear();
metadata.clear();
metadata.put("sfVer", "1,1");
if (!clientBufferParameters.getIsIcebergMode()) {
metadata.put("sfVer", "1,1");
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
}
List<Type> parquetTypes = new ArrayList<>();
int id = 1;

Expand Down
207 changes: 198 additions & 9 deletions src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@

package net.snowflake.ingest.utils;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -12,18 +20,21 @@
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.iceberg.parquet.TypeToMessageType;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.JsonUtil;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/**
* This class is used to Iceberg data type (include primitive types and nested types) serialization
* and deserialization.
*
* <p>This code is modified from
* GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg
* /IcebergDataTypeParser.java
* /IcebergDataTypeParser.java and <a
* href="https://github.com/apache/iceberg/blob/main/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java">
* TypeToMessageType.java</a>
*/
public class IcebergDataTypeParser {
private static final String TYPE = "type";
Expand All @@ -44,12 +55,26 @@ public class IcebergDataTypeParser {
private static final String ELEMENT_REQUIRED = "element-required";
private static final String VALUE_REQUIRED = "value-required";

private static final LogicalTypeAnnotation STRING = LogicalTypeAnnotation.stringType();
private static final LogicalTypeAnnotation DATE = LogicalTypeAnnotation.dateType();
private static final LogicalTypeAnnotation TIME_MICROS =
LogicalTypeAnnotation.timeType(
false /* not adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);
private static final LogicalTypeAnnotation TIMESTAMP_MICROS =
LogicalTypeAnnotation.timestampType(
false /* not adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);
private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS =
LogicalTypeAnnotation.timestampType(
true /* adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);

private static final int DECIMAL_INT32_MAX_DIGITS = 9;
private static final int DECIMAL_INT64_MAX_DIGITS = 18;
private static final int DECIMAL_MAX_DIGITS = 38;
private static final int DECIMAL_MAX_BYTES = 16;

/** Object mapper for this class */
private static final ObjectMapper MAPPER = new ObjectMapper();

sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
/** Util class that contains the mapping between Iceberg data type and Parquet data type */
private static final TypeToMessageType typeToMessageType = new TypeToMessageType();

/**
* Get Iceberg data type information by deserialization.
*
Expand All @@ -66,15 +91,15 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet
String name) {
Type icebergType = deserializeIcebergType(icebergDataType);
if (icebergType.isPrimitiveType()) {
return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name);
return primitive(icebergType.asPrimitiveType(), repetition, id, name);
} else {
switch (icebergType.typeId()) {
case LIST:
return typeToMessageType.list(icebergType.asListType(), repetition, id, name);
return list(icebergType.asListType(), repetition, id, name);
case MAP:
return typeToMessageType.map(icebergType.asMapType(), repetition, id, name);
return map(icebergType.asMapType(), repetition, id, name);
case STRUCT:
return typeToMessageType.struct(icebergType.asStructType(), repetition, id, name);
return struct(icebergType.asStructType(), repetition, id, name);
default:
throw new SFException(
ErrorCode.INTERNAL_ERROR,
Expand Down Expand Up @@ -208,4 +233,168 @@ public static Types.MapType mapFromJson(JsonNode json) {
return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
}
}

private static GroupType struct(
Types.StructType struct,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
org.apache.parquet.schema.Types.GroupBuilder<GroupType> builder =
org.apache.parquet.schema.Types.buildGroup(repetition);

for (Types.NestedField field : struct.fields()) {
builder.addField(field(field));
}

return builder.id(id).named(name);
}

private static org.apache.parquet.schema.Type field(Types.NestedField field) {
org.apache.parquet.schema.Type.Repetition repetition =
field.isOptional()
? org.apache.parquet.schema.Type.Repetition.OPTIONAL
: org.apache.parquet.schema.Type.Repetition.REQUIRED;
int id = field.fieldId();
String name = field.name();

if (field.type().isPrimitiveType()) {
return primitive(field.type().asPrimitiveType(), repetition, id, name);

} else {
Type.NestedType nested = field.type().asNestedType();
if (nested.isStructType()) {
return struct(nested.asStructType(), repetition, id, name);
} else if (nested.isMapType()) {
return map(nested.asMapType(), repetition, id, name);
} else if (nested.isListType()) {
return list(nested.asListType(), repetition, id, name);
}
throw new UnsupportedOperationException("Can't convert unknown type: " + nested);
}
}

private static GroupType list(
Types.ListType list,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
Types.NestedField elementField = list.fields().get(0);
return org.apache.parquet.schema.Types.list(repetition)
.element(field(elementField))
.id(id)
.named(name);
}

private static GroupType map(
Types.MapType map,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
Types.NestedField keyField = map.fields().get(0);
Types.NestedField valueField = map.fields().get(1);
return org.apache.parquet.schema.Types.map(repetition)
.key(field(keyField))
.value(field(valueField))
.id(id)
.named(name);
}

public static org.apache.parquet.schema.Type primitive(
Type.PrimitiveType primitive,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
switch (primitive.typeId()) {
case BOOLEAN:
return org.apache.parquet.schema.Types.primitive(BOOLEAN, repetition).id(id).named(name);
case INTEGER:
return org.apache.parquet.schema.Types.primitive(INT32, repetition).id(id).named(name);
case LONG:
return org.apache.parquet.schema.Types.primitive(INT64, repetition).id(id).named(name);
case FLOAT:
return org.apache.parquet.schema.Types.primitive(FLOAT, repetition).id(id).named(name);
case DOUBLE:
return org.apache.parquet.schema.Types.primitive(DOUBLE, repetition).id(id).named(name);
case DATE:
return org.apache.parquet.schema.Types.primitive(INT32, repetition)
.as(DATE)
.id(id)
.named(name);
case TIME:
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIME_MICROS)
.id(id)
.named(name);
case TIMESTAMP:
if (((Types.TimestampType) primitive).shouldAdjustToUTC()) {
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIMESTAMPTZ_MICROS)
.id(id)
.named(name);
} else {
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIMESTAMP_MICROS)
.id(id)
.named(name);
}
case STRING:
return org.apache.parquet.schema.Types.primitive(BINARY, repetition)
.as(STRING)
.id(id)
.named(name);
case BINARY:
return org.apache.parquet.schema.Types.primitive(BINARY, repetition).id(id).named(name);
case FIXED:
Types.FixedType fixed = (Types.FixedType) primitive;

return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(fixed.length())
.id(id)
.named(name);

case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;

if (decimal.precision() <= DECIMAL_INT32_MAX_DIGITS) {
/* Store as an int. */
return org.apache.parquet.schema.Types.primitive(INT32, repetition)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);

} else if (decimal.precision() <= DECIMAL_INT64_MAX_DIGITS) {
/* Store as a long. */
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);

} else {
/* Does not follow Iceberg spec which should be minimum number of bytes. Use fix(16) (SB16) instead. */
if (decimal.precision() > DECIMAL_MAX_DIGITS) {
throw new IllegalArgumentException(
"Unsupported decimal precision: " + decimal.precision());
}
return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(DECIMAL_MAX_BYTES)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);
}

case UUID:
return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(16)
.as(LogicalTypeAnnotation.uuidType())
.id(id)
.named(name);

default:
throw new UnsupportedOperationException("Unsupported type for Parquet: " + primitive);
}
}

private static LogicalTypeAnnotation decimalAnnotation(int precision, int scale) {
return LogicalTypeAnnotation.decimalType(scale, precision);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.datatypes;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.RandomStringUtils;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
Expand All @@ -26,6 +34,8 @@ public static Object[][] parameters() {
@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

static final Random generator = new Random(0x5EED);
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved

@Before
public void before() throws Exception {
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
Expand Down Expand Up @@ -306,6 +316,7 @@ public void testDecimal() throws Exception {
testIcebergIngestion("decimal(3, 1)", 12.5f, new FloatProvider());
testIcebergIngestion("decimal(3, 1)", -99, new IntProvider());
testIcebergIngestion("decimal(38, 0)", Long.MAX_VALUE, new LongProvider());
testIcebergIngestion("decimal(21, 0)", .0, new DoubleProvider());
testIcebergIngestion("decimal(38, 10)", null, new BigDecimalProvider());

testIcebergIngestion(
Expand Down Expand Up @@ -368,5 +379,48 @@ public void testDecimalAndQueries() throws Exception {
Arrays.asList(new BigDecimal("-12.3"), new BigDecimal("-12.3"), null),
"select COUNT({columnName}) from {tableName} where {columnName} = -12.3",
Arrays.asList(2L));

List<Object> bigDecimals_9_4 = randomBigDecimal(200, 9, 4);
testIcebergIngestAndQuery(
"decimal(9, 4)", bigDecimals_9_4, "select {columnName} from {tableName}", bigDecimals_9_4);

List<Object> bigDecimals_18_9 = randomBigDecimal(200, 18, 9);
testIcebergIngestAndQuery(
"decimal(18, 9)",
bigDecimals_18_9,
"select {columnName} from {tableName}",
bigDecimals_18_9);

List<Object> bigDecimals_21_0 = randomBigDecimal(200, 21, 0);
testIcebergIngestAndQuery(
"decimal(21, 0)",
bigDecimals_21_0,
"select {columnName} from {tableName}",
bigDecimals_21_0);

List<Object> bigDecimals_38_10 = randomBigDecimal(200, 38, 10);
testIcebergIngestAndQuery(
"decimal(38, 10)",
bigDecimals_38_10,
"select {columnName} from {tableName}",
bigDecimals_38_10);
}

private static List<Object> randomBigDecimal(int count, int precision, int scale) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i fully understand what this is trying to do (comments will help?). Lets get this merged in to unblock KC folks and I'll follow up with you offline on this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments

List<Object> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
int intPart = generator.nextInt(precision - scale + 1);
int floatPart = generator.nextInt(scale + 1);
if (intPart == 0 && floatPart == 0) {
list.add(null);
continue;
}
list.add(
new BigDecimal(
RandomStringUtils.randomNumeric(intPart)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes it impossible to reproduce failures locally. Why do we need this randomNumeric when we already generated a random number for intpart and floatpart using the seeded RNG in lines 412 and 413?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length of decimal is greater than int/long.max, thus we need string generator for this. Switch to generator with consistent seed.

+ "."
+ RandomStringUtils.randomNumeric(floatPart)));
}
return list;
}
}
Loading