diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index b9f81d94e..d7640f237 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -112,6 +112,14 @@ public interface SnowflakeConnectionService { */ void appendColumnsToTable(String tableName, Map columnInfosMap); + /** + * Alter iceberg table to modify columns datatype + * + * @param tableName the name of the table + * @param columnInfosMap the mapping from the columnNames to their columnInfos + */ + void alterColumnsDataTypeIcebergTable(String tableName, Map columnInfosMap); + /** * Alter iceberg table to add columns according to a map from columnNames to their types * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index d876afc3d..51171e928 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeConnectionV1; import net.snowflake.client.jdbc.SnowflakeDriver; import net.snowflake.client.jdbc.cloud.storage.StageInfo; @@ -498,6 +499,39 @@ public boolean hasSchemaEvolutionPermission(String tableName, String role) { return hasPermission; } + /** + * Alter iceberg table to modify columns datatype + * + * @param tableName the name of the table + * @param columnInfosMap the mapping from the columnNames to their infos + */ + @Override + public void alterColumnsDataTypeIcebergTable( + String tableName, Map columnInfosMap) { + LOGGER.debug("Modifying data types of iceberg table columns"); + String alterSetDatatypeQuery = generateAlterSetDataTypeQuery(columnInfosMap); + executeStatement(tableName, alterSetDatatypeQuery); + } + + private String generateAlterSetDataTypeQuery(Map columnsToModify) { + StringBuilder setDataTypeQuery = new StringBuilder("alter iceberg "); + setDataTypeQuery.append("table identifier(?) alter column "); + + String columnsPart = + columnsToModify.entrySet().stream() + .map( + column -> { + String columnName = column.getKey(); + String dataType = column.getValue().getColumnType(); + return columnName + " set data type " + dataType; + }) + .collect(Collectors.joining(", ")); + + setDataTypeQuery.append(columnsPart); + + return setDataTypeQuery.toString(); + } + /** * Alter table to add columns according to a map from columnNames to their types * @@ -552,18 +586,22 @@ private void appendColumnsToTable( logColumn.append(columnName).append(" (").append(columnInfosMap.get(columnName)).append(")"); } + executeStatement(tableName, appendColumnQuery.toString()); + + logColumn.insert(0, "Following columns created for table {}:\n").append("]"); + LOGGER.info(logColumn.toString(), tableName); + } + + private void executeStatement(String tableName, String query) { try { - LOGGER.info("Trying to run query: {}", appendColumnQuery.toString()); - PreparedStatement stmt = conn.prepareStatement(appendColumnQuery.toString()); + LOGGER.info("Trying to run query: {}", query); + PreparedStatement stmt = conn.prepareStatement(query); stmt.setString(1, tableName); stmt.execute(); stmt.close(); } catch (SQLException e) { throw SnowflakeErrors.ERROR_2015.getException(e); } - - logColumn.insert(0, "Following columns created for table {}:\n").append("]"); - LOGGER.info(logColumn.toString(), tableName); } /** diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 4406cd608..39f098289 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -339,7 +339,8 @@ public enum SnowflakeErrors { "Timeout while waiting for file cleaner to start", "Could not allocate thread for file cleaner to start processing in given time. If problem" + " persists, please try setting snowflake.snowpipe.use_new_cleaner to false"), - ; + ERROR_5025( + "5025", "Unexpected data type", "Unexpected data type encountered during schema evolution."); // properties diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java index 7f37878a2..f092ebd1d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java @@ -684,7 +684,8 @@ public InsertRowsResponse get() throws Throwable { LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems); schemaEvolutionService.evolveSchemaIfNeeded( schemaEvolutionTargetItems, - this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx)); + this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx), + channel.getTableSchema()); // Offset reset needed since it's possible that we successfully ingested partial batch needToResetOffset = true; break; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index 595d413d8..d9e2f87ce 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -51,10 +51,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; -import net.snowflake.ingest.streaming.InsertValidationResponse; -import net.snowflake.ingest.streaming.OpenChannelRequest; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; +import net.snowflake.ingest.streaming.*; import net.snowflake.ingest.utils.SFException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; @@ -539,7 +536,8 @@ private void handleInsertRowFailure( SchemaEvolutionTargetItems schemaEvolutionTargetItems = insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName()); if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) { - schemaEvolutionService.evolveSchemaIfNeeded(schemaEvolutionTargetItems, kafkaSinkRecord); + schemaEvolutionService.evolveSchemaIfNeeded( + schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema()); streamingApiFallbackSupplier( StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK); return; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java index b20f3956e..8be59e787 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java @@ -1,5 +1,7 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution; +import java.util.Map; +import net.snowflake.ingest.streaming.internal.ColumnProperties; import org.apache.kafka.connect.sink.SinkRecord; public interface SchemaEvolutionService { @@ -11,6 +13,10 @@ public interface SchemaEvolutionService { * @param targetItems target items for schema evolution such as table name, columns to drop * nullability, and columns to add * @param record the sink record that contains the schema and actual data + * @param existingSchema schema stored in a channel */ - void evolveSchemaIfNeeded(SchemaEvolutionTargetItems targetItems, SinkRecord record); + void evolveSchemaIfNeeded( + SchemaEvolutionTargetItems targetItems, + SinkRecord record, + Map existingSchema); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnJsonValuePair.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnJsonValuePair.java new file mode 100644 index 000000000..fc138084e --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnJsonValuePair.java @@ -0,0 +1,26 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Map; + +class IcebergColumnJsonValuePair { + private final String columnName; + private final JsonNode jsonNode; + + static IcebergColumnJsonValuePair from(Map.Entry field) { + return new IcebergColumnJsonValuePair(field.getKey(), field.getValue()); + } + + IcebergColumnJsonValuePair(String columnName, JsonNode jsonNode) { + this.columnName = columnName; + this.jsonNode = jsonNode; + } + + String getColumnName() { + return columnName; + } + + JsonNode getJsonNode() { + return jsonNode; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnSchema.java similarity index 60% rename from src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java rename to src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnSchema.java index 650ab64d7..5107900d0 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnSchema.java @@ -3,22 +3,22 @@ import org.apache.iceberg.types.Type; /** Wrapper class for Iceberg schema retrieved from channel. */ -public class ApacheIcebergColumnSchema { +class IcebergColumnSchema { private final Type schema; private final String columnName; - public ApacheIcebergColumnSchema(Type schema, String columnName) { + IcebergColumnSchema(Type schema, String columnName) { this.schema = schema; - this.columnName = columnName.toUpperCase(); + this.columnName = columnName; } - public Type getSchema() { + Type getSchema() { return schema; } - public String getColumnName() { + String getColumnName() { return columnName; } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java index 93495cde7..03eb2d36b 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java @@ -1,16 +1,19 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; /** Class with object types compatible with Snowflake Iceberg table */ -public class IcebergColumnTree { +class IcebergColumnTree { private final IcebergFieldNode rootNode; - public IcebergColumnTree(ApacheIcebergColumnSchema columnSchema) { - this.rootNode = new IcebergFieldNode(columnSchema.getColumnName(), columnSchema.getSchema()); + String getColumnName() { + return rootNode.name; } - public String buildQuery() { - StringBuilder sb = new StringBuilder(); - return rootNode.buildQuery(sb, "ROOT_NODE").toString(); + IcebergFieldNode getRootNode() { + return rootNode; + } + + IcebergColumnTree(IcebergFieldNode rootNode) { + this.rootNode = rootNode; } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java new file mode 100644 index 000000000..4cd2a72bc --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java @@ -0,0 +1,169 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.snowflake.kafka.connector.internal.KCLogger; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; + +public class IcebergColumnTreeFactory { + + private final KCLogger LOGGER = new KCLogger(IcebergColumnTreeFactory.class.getName()); + + private final IcebergColumnTypeMapper mapper; + + public IcebergColumnTreeFactory() { + this.mapper = new IcebergColumnTypeMapper(); + } + + IcebergColumnTree fromIcebergSchema(IcebergColumnSchema columnSchema) { + LOGGER.debug( + "Attempting to parse schema from schema stored in a channel for column: " + + columnSchema.getColumnName()); + IcebergFieldNode rootNode = + createNode(columnSchema.getColumnName().toUpperCase(), columnSchema.getSchema()); + return new IcebergColumnTree(rootNode); + } + + IcebergColumnTree fromJson(IcebergColumnJsonValuePair pair) { + LOGGER.debug( + "Attempting to parse schema from records payload for column: " + pair.getColumnName()); + IcebergFieldNode rootNode = createNode(pair.getColumnName().toUpperCase(), pair.getJsonNode()); + return new IcebergColumnTree(rootNode); + } + + IcebergColumnTree fromConnectSchema(Field kafkaConnectField) { + LOGGER.debug( + "Attempting to parse schema from schema attached to a record for column: " + + kafkaConnectField.name()); + IcebergFieldNode rootNode = + createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema()); + return new IcebergColumnTree(rootNode); + } + + // -- parse tree from Iceberg schema logic -- + private IcebergFieldNode createNode(String name, Type apacheIcebergSchema) { + String snowflakeType = mapper.mapToColumnTypeFromIcebergSchema(apacheIcebergSchema); + return new IcebergFieldNode(name, snowflakeType, produceChildren(apacheIcebergSchema)); + } + + private LinkedHashMap produceChildren(Type apacheIcebergSchema) { + // primitives must not have children + if (apacheIcebergSchema.isPrimitiveType()) { + return new LinkedHashMap<>(); + } + Type.NestedType nestedField = apacheIcebergSchema.asNestedType(); + return nestedField.fields().stream() + .collect( + Collectors.toMap( + Types.NestedField::name, + field -> createNode(field.name(), field.type()), + // It's impossible to have two same keys + (v1, v2) -> { + throw new IllegalArgumentException("Two same keys: " + v1); + }, + LinkedHashMap::new)); + } + + // -- parse tree from kafka record payload logic -- + private IcebergFieldNode createNode(String name, JsonNode jsonNode) { + String snowflakeType = mapper.mapToColumnTypeFromJson(jsonNode); + return new IcebergFieldNode(name, snowflakeType, produceChildren(jsonNode)); + } + + private LinkedHashMap produceChildren(JsonNode recordNode) { + if (recordNode.isNull()) { + return new LinkedHashMap<>(); + } + if (recordNode.isArray()) { + ArrayNode arrayNode = (ArrayNode) recordNode; + return produceChildrenFromArray(arrayNode); + } + if (recordNode.isObject()) { + ObjectNode objectNode = (ObjectNode) recordNode; + return produceChildrenFromObject(objectNode); + } + return new LinkedHashMap<>(); + } + + private LinkedHashMap produceChildrenFromArray(ArrayNode arrayNode) { + JsonNode arrayElement = arrayNode.get(0); + // VARCHAR is set for an empty array: [] -> ARRAY(VARCHAR) + if (arrayElement == null) { + LinkedHashMap child = new LinkedHashMap<>(); + child.put( + "element", new IcebergFieldNode("element", "VARCHAR(16777216)", new LinkedHashMap<>())); + return child; + } + LinkedHashMap child = new LinkedHashMap<>(); + child.put("element", createNode("element", arrayElement)); + return child; + } + + private LinkedHashMap produceChildrenFromObject(ObjectNode objectNode) { + return objectNode.properties().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + stringJsonNodeEntry -> + createNode(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue()), + (v1, v2) -> { + throw new IllegalArgumentException("Two same keys: " + v1); + }, + LinkedHashMap::new)); + } + + // -- parse tree from kafka record schema logic -- + private IcebergFieldNode createNode(String name, Schema schema) { + String snowflakeType = + mapper.mapToColumnTypeFromKafkaSchema(schema.schema().type(), schema.schema().name()); + return new IcebergFieldNode(name, snowflakeType, produceChildren(schema.schema())); + } + + private LinkedHashMap produceChildren(Schema connectSchema) { + if (connectSchema.type() == Schema.Type.STRUCT) { + return produceChildrenFromStruct(connectSchema); + } + if (connectSchema.type() == Schema.Type.MAP) { + return produceChildrenFromMap(connectSchema); + } + if (connectSchema.type() == Schema.Type.ARRAY) { + return produceChildrenForArray(connectSchema); + } else { // isPrimitive == true + return new LinkedHashMap<>(); + } + } + + private LinkedHashMap produceChildrenForArray( + Schema connectSchemaForArray) { + LinkedHashMap child = new LinkedHashMap<>(); + child.put("element", createNode("element", connectSchemaForArray.valueSchema())); + return child; + } + + private LinkedHashMap produceChildrenFromStruct(Schema connectSchema) { + return connectSchema.fields().stream() + .collect( + Collectors.toMap( + Field::name, + f -> createNode(f.name(), f.schema()), + (v1, v2) -> { + throw new IllegalArgumentException("Two same keys: " + v1); + }, + LinkedHashMap::new)); + } + + private LinkedHashMap produceChildrenFromMap(Schema connectSchema) { + LinkedHashMap keyValue = new LinkedHashMap<>(); + // these names will not be used when creating a query + keyValue.put("key", createNode("key", connectSchema.keySchema())); + keyValue.put("value", createNode("value", connectSchema.valueSchema())); + return keyValue; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeMerger.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeMerger.java new file mode 100644 index 000000000..be2396f2d --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeMerger.java @@ -0,0 +1,39 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import com.google.common.base.Preconditions; +import com.snowflake.kafka.connector.internal.KCLogger; + +public class IcebergColumnTreeMerger { + + private final KCLogger LOGGER = new KCLogger(IcebergColumnTreeMerger.class.getName()); + + /** + * Method designed for unstructured data types. Enhances already existing unstructured columns + * with new subfields. + */ + void merge(IcebergColumnTree currentTree, IcebergColumnTree treeWithNewType) { + validate(currentTree, treeWithNewType); + LOGGER.debug("Attempting to apply changes for column:" + currentTree.getColumnName()); + + merge(currentTree.getRootNode(), treeWithNewType.getRootNode()); + } + + /** Method adds new children to a node. It does not change anything else. */ + private void merge(IcebergFieldNode currentNode, IcebergFieldNode nodeToMerge) { + nodeToMerge.children.forEach( + (key, node) -> { + IcebergFieldNode currentNodesChild = currentNode.children.get(key); + if (currentNodesChild == null) { + currentNode.children.put(key, node); + } else { + merge(currentNodesChild, node); + } + }); + } + + private void validate(IcebergColumnTree currentTree, IcebergColumnTree treeWithNewType) { + Preconditions.checkArgument( + currentTree.getColumnName().equals(treeWithNewType.getColumnName()), + "Error merging column schemas. Tried to merge schemas for two different columns"); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java new file mode 100644 index 000000000..bc25cc72d --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java @@ -0,0 +1,58 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +public class IcebergColumnTreeTypeBuilder { + + private static final String ROOT_NODE_TYPE = "ROOT_NODE"; + + /** Returns data type of the column */ + String buildType(IcebergColumnTree columnTree) { + StringBuilder sb = new StringBuilder(); + IcebergFieldNode rootNode = columnTree.getRootNode(); + return buildType(sb, rootNode, ROOT_NODE_TYPE).toString(); + } + + /** + * Generate Snow SQL type for the column. + * + * @param sb StringBuilder + * @param parentType Snowflake Iceberg table compatible type. ROOT_NODE_TYPE is a special case, + * here we never generate column name for it. + * @return SQL type of the column + */ + private StringBuilder buildType(StringBuilder sb, IcebergFieldNode fieldNode, String parentType) { + if (parentType.equals("ARRAY") + || parentType.equals("MAP") + || parentType.equals(ROOT_NODE_TYPE)) { + sb.append(fieldNode.snowflakeIcebergType); + } else { + appendNameAndType(sb, fieldNode); + } + if (!fieldNode.children.isEmpty()) { + sb.append("("); + appendChildren(sb, fieldNode); + sb.append(")"); + } + return sb; + } + + private void appendNameAndType(StringBuilder sb, IcebergFieldNode fieldNode) { + sb.append(fieldNode.name); + sb.append(" "); + sb.append(fieldNode.snowflakeIcebergType); + } + + private void appendChildren(StringBuilder sb, IcebergFieldNode parentNode) { + String parentType = parentNode.snowflakeIcebergType; + parentNode.children.forEach( + (name, childNode) -> { + buildType(sb, childNode, parentType); + sb.append(", "); + }); + removeLastSeparator(sb); + } + + private void removeLastSeparator(StringBuilder sb) { + sb.deleteCharAt(sb.length() - 1); + sb.deleteCharAt(sb.length() - 1); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java index b0a6c9e74..4af7602c4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java @@ -10,7 +10,7 @@ import static org.apache.kafka.connect.data.Schema.Type.STRUCT; import com.fasterxml.jackson.databind.JsonNode; -import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper; +import com.snowflake.kafka.connector.internal.SnowflakeErrors; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.kafka.connect.data.Date; @@ -19,23 +19,23 @@ import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; -public class IcebergColumnTypeMapper extends ColumnTypeMapper { +class IcebergColumnTypeMapper { /** * See Data types for * Apache Iceberg™ tables */ - public static String mapToSnowflakeDataType(Type apacheIcebergType) { + String mapToColumnTypeFromIcebergSchema(Type apacheIcebergType) { switch (apacheIcebergType.typeId()) { case BOOLEAN: return "BOOLEAN"; case INTEGER: - return "NUMBER(10,0)"; + return "INT"; case LONG: - return "NUMBER(19,0)"; + return "LONG"; case FLOAT: case DOUBLE: - return "FLOAT"; + return "DOUBLE"; case DATE: return "DATE"; case TIME: @@ -61,13 +61,23 @@ public static String mapToSnowflakeDataType(Type apacheIcebergType) { case MAP: return "MAP"; default: - throw new IllegalArgumentException( - "Fail unsupported datatype! - " + apacheIcebergType.typeId()); + throw SnowflakeErrors.ERROR_5025.getException( + "Data type: " + apacheIcebergType.typeId().name()); } } - @Override - public String mapToColumnType(Schema.Type kafkaType, String schemaName) { + /** + * Method to convert datatype read from a record to column type used in Snowflake. This used for a + * code path without available schema. + * + *

Converts Types from: JsonNode -> KafkaKafka -> Snowflake. + */ + String mapToColumnTypeFromJson(JsonNode value) { + Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(value); + return mapToColumnTypeFromKafkaSchema(kafkaType, null); + } + + String mapToColumnTypeFromKafkaSchema(Schema.Type kafkaType, String schemaName) { switch (kafkaType) { case INT8: case INT16: @@ -101,9 +111,15 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) { return "BINARY"; } case ARRAY: + return "ARRAY"; + case STRUCT: + return "OBJECT"; + case MAP: + return "MAP"; default: - // MAP and STRUCT will go here - throw new IllegalArgumentException("Arrays, struct and map not supported!"); + // todo try to throw info about a whole record - this is pure + throw new IllegalArgumentException( + "Error parsing datatype from Kafka record: " + kafkaType); } } @@ -113,8 +129,7 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) { * @param value JSON node * @return Kafka type */ - @Override - public Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { + Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { if (value == null || value.isNull()) { return STRING; } else if (value.isNumber()) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java index 6ffbd89cd..be090a8b3 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java @@ -16,7 +16,7 @@ * This class is used to Iceberg data type (include primitive types and nested types) serialization * and deserialization. */ -public class IcebergDataTypeParser { +class IcebergDataTypeParser { public static final String ELEMENT = "element"; public static final String KEY = "key"; public static final String VALUE = "value"; @@ -46,7 +46,7 @@ public class IcebergDataTypeParser { * @param icebergDataType string representation of Iceberg data type * @return Iceberg data type */ - public static Type deserializeIcebergType(String icebergDataType) { + static Type deserializeIcebergType(String icebergDataType) { try { JsonNode json = MAPPER.readTree(icebergDataType); return getTypeFromJson(json); @@ -62,7 +62,7 @@ public static Type deserializeIcebergType(String icebergDataType) { * @param jsonNode JsonNode parsed from Iceberg type string. * @return Iceberg data type */ - public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { + static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { if (jsonNode.isTextual()) { return Types.fromPrimitiveString(jsonNode.asText()); } else if (jsonNode.isObject()) { @@ -91,7 +91,7 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { * @param json JsonNode parsed from Iceberg type string. * @return struct type */ - public static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) { + static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) { if (!json.has(FIELDS)) { throw new IllegalArgumentException( String.format("Missing key '%s' in schema: %s", FIELDS, json)); @@ -137,7 +137,7 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { * @param json JsonNode parsed from Iceberg type string. * @return list type */ - public static Types.ListType listFromJson(JsonNode json) { + static Types.ListType listFromJson(JsonNode json) { int elementId = JsonUtil.getInt(ELEMENT_ID, json); Type elementType = getTypeFromJson(json.get(ELEMENT)); boolean isRequired = JsonUtil.getBool(ELEMENT_REQUIRED, json); @@ -155,7 +155,7 @@ public static Types.ListType listFromJson(JsonNode json) { * @param json JsonNode parsed from Iceberg type string. * @return map type */ - public static Types.MapType mapFromJson(JsonNode json) { + static Types.MapType mapFromJson(JsonNode json) { int keyId = JsonUtil.getInt(KEY_ID, json); Type keyType = getTypeFromJson(json.get(KEY)); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java index fdca41d22..d93525078 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java @@ -1,86 +1,19 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; import java.util.LinkedHashMap; -import java.util.stream.Collectors; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; class IcebergFieldNode { - public final String name; + final String name; - public final String snowflakeIcebergType; + final String snowflakeIcebergType; - public final LinkedHashMap children; + final LinkedHashMap children; - IcebergFieldNode(String name, Type apacheIcebergSchema) { + public IcebergFieldNode( + String name, String snowflakeIcebergType, LinkedHashMap children) { this.name = name; - this.snowflakeIcebergType = IcebergColumnTypeMapper.mapToSnowflakeDataType(apacheIcebergSchema); - this.children = produceChildren(apacheIcebergSchema); - } - - private LinkedHashMap produceChildren(Type apacheIcebergSchema) { - // primitives must not have children - if (apacheIcebergSchema.isPrimitiveType()) { - return new LinkedHashMap<>(); - } - Type.NestedType nestedField = apacheIcebergSchema.asNestedType(); - return nestedField.fields().stream() - .collect( - Collectors.toMap( - Types.NestedField::name, - this::fromNestedField, - // It's impossible to have two same keys - (v1, v2) -> { - throw new IllegalArgumentException("Two same keys: " + v1); - }, - LinkedHashMap::new)); - } - - private IcebergFieldNode fromNestedField(Types.NestedField field) { - return new IcebergFieldNode(field.name(), field.type()); - } - - /** - * @param sb StringBuilder - * @param parentType Snowflake Iceberg table compatible type. If a root node is a parent then - * "ROOT_NODE" is passed, because we always generate root nodes column name. - * @return StringBuilder with appended query elements - */ - StringBuilder buildQuery(StringBuilder sb, String parentType) { - if (parentType.equals("ARRAY") || parentType.equals("MAP")) { - sb.append(snowflakeIcebergType); - } else { - appendNameAndType(sb); - } - if (!children.isEmpty()) { - sb.append("("); - appendChildren(sb, this.snowflakeIcebergType); - sb.append(")"); - } - return sb; - } - - private StringBuilder appendNameAndType(StringBuilder sb) { - sb.append(name); - sb.append(" "); - sb.append(snowflakeIcebergType); - return sb; - } - - private StringBuilder appendChildren(StringBuilder sb, String parentType) { - children.forEach( - (name, node) -> { - node.buildQuery(sb, parentType); - sb.append(", "); - }); - removeLastSeparator(sb); - return sb; - } - - private StringBuilder removeLastSeparator(StringBuilder sb) { - sb.deleteCharAt(sb.length() - 1); - sb.deleteCharAt(sb.length() - 1); - return sb; + this.snowflakeIcebergType = snowflakeIcebergType; + this.children = children; } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java index ed89c4bd7..99aa4fe1d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java @@ -1,13 +1,14 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; +import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnInfos; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionService; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionTargetItems; -import com.snowflake.kafka.connector.internal.streaming.schemaevolution.TableSchema; -import com.snowflake.kafka.connector.internal.streaming.schemaevolution.TableSchemaResolver; -import java.util.List; +import java.util.*; +import java.util.stream.Collectors; +import net.snowflake.ingest.streaming.internal.ColumnProperties; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,48 +18,171 @@ public class IcebergSchemaEvolutionService implements SchemaEvolutionService { private static final Logger LOGGER = LoggerFactory.getLogger(IcebergSchemaEvolutionService.class); private final SnowflakeConnectionService conn; - private final TableSchemaResolver tableSchemaResolver; + private final IcebergTableSchemaResolver icebergTableSchemaResolver; + private final IcebergColumnTreeMerger mergeTreeService; + private final IcebergColumnTreeTypeBuilder typeBuilder; public IcebergSchemaEvolutionService(SnowflakeConnectionService conn) { this.conn = conn; - this.tableSchemaResolver = new IcebergTableSchemaResolver(); - } - - @VisibleForTesting - IcebergSchemaEvolutionService( - SnowflakeConnectionService conn, TableSchemaResolver tableSchemaResolver) { - this.conn = conn; - this.tableSchemaResolver = tableSchemaResolver; + this.icebergTableSchemaResolver = new IcebergTableSchemaResolver(); + this.mergeTreeService = new IcebergColumnTreeMerger(); + this.typeBuilder = new IcebergColumnTreeTypeBuilder(); } /** - * Execute an ALTER TABLE command if there is any extra column that needs to be added, or any - * column nullability that needs to be updated, used by schema evolution - * - * @param targetItems target items for schema evolution such as table name, columns to drop, - * columns to add - * @param record the sink record that contains the schema and actual data + * @param targetItems column and field names from InsertError returned by ingest-sdk + * @param record record that caused an error + * @param existingSchema schema stored in a channel */ @Override - public void evolveSchemaIfNeeded(SchemaEvolutionTargetItems targetItems, SinkRecord record) { + public void evolveSchemaIfNeeded( + SchemaEvolutionTargetItems targetItems, + SinkRecord record, + Map existingSchema) { String tableName = targetItems.getTableName(); - List columnsToAdd = targetItems.getColumnsToAdd(); + + Set columnsToEvolve = extractColumnNames(targetItems); + // Add columns if needed, ignore any exceptions since other task might be succeeded - if (!columnsToAdd.isEmpty()) { - LOGGER.debug("Adding columns to iceberg table: {} columns: {}", tableName, columnsToAdd); - TableSchema tableSchema = - tableSchemaResolver.resolveTableSchemaFromRecord(record, columnsToAdd); - try { - conn.appendColumnsToIcebergTable(tableName, tableSchema.getColumnInfos()); - } catch (SnowflakeKafkaConnectorException e) { - LOGGER.warn( - String.format( - "Failure altering iceberg table to add column: %s, this could happen when multiple" - + " partitions try to alter the table at the same time and the warning could be" - + " ignored", - tableName), - e); - } + if (!columnsToEvolve.isEmpty()) { + LOGGER.debug("Adding columns to iceberg table: {} columns: {}", tableName, columnsToEvolve); + // some of the column might already exist, and we will modify them, not create + List alreadyExistingColumns = + icebergTableSchemaResolver.resolveIcebergSchemaFromChannel( + existingSchema, columnsToEvolve); + + List modifiedOrAddedColumns = + icebergTableSchemaResolver.resolveIcebergSchemaFromRecord(record, columnsToEvolve); + + List columnsToAdd = + distinguishColumnsToAdd(alreadyExistingColumns, modifiedOrAddedColumns); + + List columnsToModify = + distinguishColumnsToModify(alreadyExistingColumns, modifiedOrAddedColumns); + + alterAddColumns(tableName, columnsToAdd); + + alterDataType(tableName, alreadyExistingColumns, columnsToModify); + } + } + + /** + * Get only column names, ignore nested field names, remove double quotes. + * + *

example: TEST_STRUCT.field1 -> TEST_STRUCT + */ + private Set extractColumnNames(SchemaEvolutionTargetItems targetItems) { + return targetItems.getColumnsToAdd().stream() + .map(this::removeNestedFieldNames) + .map(this::removeDoubleQuotes) + .collect(Collectors.toSet()); + } + + private String removeNestedFieldNames(String columnNameWithNestedNames) { + return columnNameWithNestedNames.split("\\.")[0]; + } + + private String removeDoubleQuotes(String columnName) { + return columnName.replaceAll("\"", ""); + } + + /** Columns that are not present in a current schema are to be added */ + private List distinguishColumnsToAdd( + List alreadyExistingColumns, + List modifiedOrAddedColumns) { + return modifiedOrAddedColumns.stream() + .filter( + modifiedOrAddedColumn -> + alreadyExistingColumns.stream() + .noneMatch( + tree -> + tree.getColumnName() + .equalsIgnoreCase(modifiedOrAddedColumn.getColumnName()))) + .collect(Collectors.toList()); + } + + /** If columns is present in a current schema it means it has to be modified */ + private List distinguishColumnsToModify( + List alreadyExistingColumns, + List modifiedOrAddedColumns) { + return modifiedOrAddedColumns.stream() + .filter( + modifiedOrAddedColumn -> + alreadyExistingColumns.stream() + .anyMatch( + tree -> + tree.getColumnName() + .equalsIgnoreCase(modifiedOrAddedColumn.getColumnName()))) + .collect(Collectors.toList()); + } + + private void alterAddColumns(String tableName, List addedColumns) { + if (addedColumns.isEmpty()) { + return; } + Map columnInfosMap = toColumnInfos(addedColumns); + try { + conn.appendColumnsToIcebergTable(tableName, columnInfosMap); + } catch (SnowflakeKafkaConnectorException e) { + logQueryFailure(tableName, e); + } + } + + private void alterDataType( + String tableName, + List alreadyExistingColumns, + List modifiedColumns) { + if (modifiedColumns.isEmpty()) { + return; + } + mergeChangesIntoExistingColumns(alreadyExistingColumns, modifiedColumns); + Map columnInfosMap = toColumnInfos(alreadyExistingColumns); + try { + conn.alterColumnsDataTypeIcebergTable(tableName, columnInfosMap); + } catch (SnowflakeKafkaConnectorException e) { + logQueryFailure(tableName, e); + } + } + + private void mergeChangesIntoExistingColumns( + List alreadyExistingColumns, List modifiedColumns) { + alreadyExistingColumns.forEach( + existingColumn -> { + List modifiedColumnMatchingExisting = + modifiedColumns.stream() + .filter(c -> c.getColumnName().equals(existingColumn.getColumnName())) + .collect(Collectors.toList()); + if (modifiedColumnMatchingExisting.size() != 1) { + LOGGER.warn( + "Skipping schema evolution of a column {}. Incorrect number of new versions of the" + + " column: {}", + existingColumn.getColumnName(), + modifiedColumnMatchingExisting.stream() + .map(IcebergColumnTree::getColumnName) + .collect(Collectors.toList())); + } + mergeTreeService.merge(existingColumn, modifiedColumnMatchingExisting.get(0)); + }); + } + + private Map toColumnInfos(List columnTrees) { + return columnTrees.stream() + .map( + columnTree -> + Maps.immutableEntry( + columnTree.getColumnName(), new ColumnInfos(typeBuilder.buildType(columnTree)))) + .collect( + Collectors.toMap( + Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue)); + } + + private void logQueryFailure(String tableName, SnowflakeKafkaConnectorException e) { + LOGGER.warn( + String.format( + "Failure altering iceberg table to add column: %s, this could happen when multiple" + + " partitions try to alter the table at the same time and the warning could be" + + " ignored", + tableName), + e); } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java deleted file mode 100644 index e4ed70f3e..000000000 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; - -import java.util.List; - -/** Wrapper for multiple columns, not necessary all columns that are in the table */ -public class IcebergTableSchema { - - private List columns; -} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java index e1a7e3a8b..cf4df17e8 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java @@ -1,16 +1,134 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; -import com.google.common.annotations.VisibleForTesting; -import com.snowflake.kafka.connector.internal.streaming.schemaevolution.TableSchemaResolver; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; +import com.snowflake.kafka.connector.internal.SnowflakeErrors; +import com.snowflake.kafka.connector.records.RecordService; +import java.util.*; +import java.util.stream.Collectors; +import net.snowflake.ingest.streaming.internal.ColumnProperties; +import org.apache.iceberg.types.Type; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class IcebergTableSchemaResolver extends TableSchemaResolver { +class IcebergTableSchemaResolver { + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableSchemaResolver.class); + private final IcebergColumnTreeFactory treeFactory; - @VisibleForTesting - IcebergTableSchemaResolver(IcebergColumnTypeMapper columnTypeMapper) { - super(columnTypeMapper); + public IcebergTableSchemaResolver() { + this.treeFactory = new IcebergColumnTreeFactory(); } - public IcebergTableSchemaResolver() { - super(new IcebergColumnTypeMapper()); + /** + * Retrieve IcebergSchema stored in a channel, then parse it into a tree. Filter out columns that + * do not need to be modified. + */ + public List resolveIcebergSchemaFromChannel( + Map tableSchemaFromChannel, Set columnsToEvolve) { + + return tableSchemaFromChannel.entrySet().stream() + .filter( + (schemaFromChannelEntry) -> { + String columnNameFromChannel = schemaFromChannelEntry.getKey(); + return columnsToEvolve.contains(columnNameFromChannel); + }) + .map(this::mapIcebergSchemaFromChannel) + .map(treeFactory::fromIcebergSchema) + .collect(Collectors.toList()); + } + + public List resolveIcebergSchemaFromRecord( + SinkRecord record, Set columnsToEvolve) { + if (columnsToEvolve == null || columnsToEvolve.isEmpty()) { + return ImmutableList.of(); + } + if (hasSchema(record)) { + LOGGER.debug( + "Schema found. Evolve columns basing on a record's schema, column: {}", columnsToEvolve); + return getTableSchemaFromRecordSchema(record, columnsToEvolve); + } else { + LOGGER.debug( + "Schema NOT found. Evolve columns basing on a record's payload, columns: {}", + columnsToEvolve); + return getTableSchemaFromJson(record, columnsToEvolve); + } + } + + private IcebergColumnSchema mapIcebergSchemaFromChannel( + Map.Entry schemaFromChannelEntry) { + + ColumnProperties columnProperty = schemaFromChannelEntry.getValue(); + String plainIcebergSchema = getIcebergSchema(columnProperty); + + Type schema = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); + String columnName = schemaFromChannelEntry.getKey(); + return new IcebergColumnSchema(schema, columnName); + } + + // todo remove in 1820155 when getIcebergSchema() method is made public + private static String getIcebergSchema(ColumnProperties columnProperties) { + try { + java.lang.reflect.Field field = + columnProperties.getClass().getDeclaredField("icebergColumnSchema"); + field.setAccessible(true); + return (String) field.get(columnProperties); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new IllegalStateException( + "Couldn't set iceberg by accessing private field: isIceberg", e); + } + } + + private boolean hasSchema(SinkRecord record) { + return record.valueSchema() != null + && record.valueSchema().fields() != null + && !record.valueSchema().fields().isEmpty(); + } + + private List getTableSchemaFromJson( + SinkRecord record, Set columnsToEvolve) { + JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true); + + return Streams.stream(recordNode.fields()) + .map(IcebergColumnJsonValuePair::from) + .filter(pair -> columnsToEvolve.contains(pair.getColumnName().toUpperCase())) + .map(treeFactory::fromJson) + .collect(Collectors.toList()); + } + + /** + * Given a SinkRecord, get the schema information from it + * + * @param record the sink record that contains the schema and actual data + * @return list of column representation in a form of tree + */ + private List getTableSchemaFromRecordSchema( + SinkRecord record, Set columnsToEvolve) { + + List schemaColumns = record.valueSchema().fields(); + List foundColumns = + schemaColumns.stream() + .filter( + schemaColumnName -> columnsToEvolve.contains(schemaColumnName.name().toUpperCase())) + .collect(Collectors.toList()); + + if (foundColumns.size() < columnsToEvolve.size()) { + List notFoundColumns = + schemaColumns.stream() + .map(Field::name) + .filter(schemaColumnName -> !columnsToEvolve.contains(schemaColumnName.toUpperCase())) + .collect(Collectors.toList()); + + throw SnowflakeErrors.ERROR_5022.getException( + "Columns not found in schema: " + + notFoundColumns + + ", schemaColumns: " + + schemaColumns.stream().map(Field::name).collect(Collectors.toList()) + + ", foundColumns: " + + foundColumns.stream().map(Field::name).collect(Collectors.toList())); + } + return foundColumns.stream().map(treeFactory::fromConnectSchema).collect(Collectors.toList()); } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeSchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeSchemaEvolutionService.java index 4feb1bd5e..f55e4e63f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeSchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeSchemaEvolutionService.java @@ -8,6 +8,8 @@ import com.snowflake.kafka.connector.internal.streaming.schemaevolution.TableSchema; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.TableSchemaResolver; import java.util.List; +import java.util.Map; +import net.snowflake.ingest.streaming.internal.ColumnProperties; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +41,13 @@ public SnowflakeSchemaEvolutionService(SnowflakeConnectionService conn) { * @param targetItems target items for schema evolution such as table name, columns to drop, * columns to add * @param record the sink record that contains the schema and actual data + * @param existingSchema is unused in this implementation */ @Override - public void evolveSchemaIfNeeded(SchemaEvolutionTargetItems targetItems, SinkRecord record) { + public void evolveSchemaIfNeeded( + SchemaEvolutionTargetItems targetItems, + SinkRecord record, + Map existingSchema) { String tableName = targetItems.getTableName(); List columnsToDropNullability = targetItems.getColumnsToDropNonNullability(); // Update nullability if needed, ignore any exceptions since other task might be succeeded diff --git a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java index 99b566cf0..eb1fa1071 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java @@ -13,7 +13,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.stream.Collectors; class IcebergTableStreamingRecordMapper extends StreamingRecordMapper { private static final TypeReference> OBJECTS_MAP_TYPE_REFERENCE = @@ -54,7 +53,7 @@ private Map getMapForSchematization(JsonNode node) { entry -> new AbstractMap.SimpleEntry<>( Utils.quoteNameIfNeeded(entry.getKey()), entry.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue()), HashMap::putAll); } private Map getMapForMetadata(JsonNode metadataNode) diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java index f049bf606..4b2ad16d1 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java @@ -1,7 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; @@ -23,7 +22,8 @@ class IcebergColumnTypeMapperTest { @MethodSource("kafkaTypesToMap") void shouldMapKafkaTypeToSnowflakeColumnType( Schema.Type kafkaType, String schemaName, String expectedSnowflakeType) { - assertThat(mapper.mapToColumnType(kafkaType, schemaName)).isEqualTo(expectedSnowflakeType); + assertThat(mapper.mapToColumnTypeFromKafkaSchema(kafkaType, schemaName)) + .isEqualTo(expectedSnowflakeType); } @ParameterizedTest() @@ -32,13 +32,6 @@ void shouldMapJsonNodeTypeToKafkaType(JsonNode value, Schema.Type expectedKafkaT assertThat(mapper.mapJsonNodeTypeToKafkaType(value)).isEqualTo(expectedKafkaType); } - @ParameterizedTest() - @MethodSource("kafkaTypesToThrowException") - void shouldThrowExceptionWhenMappingUnsupportedKafkaType(Schema.Type kafkaType) { - assertThatThrownBy(() -> mapper.mapToColumnType(kafkaType, null)) - .isInstanceOf(IllegalArgumentException.class); - } - private static Stream kafkaTypesToMap() { return Stream.of( Arguments.of(Schema.Type.INT8, null, "INT"), @@ -53,7 +46,10 @@ private static Stream kafkaTypesToMap() { Arguments.of(Schema.Type.BOOLEAN, null, "BOOLEAN"), Arguments.of(Schema.Type.STRING, null, "VARCHAR"), Arguments.of(Schema.Type.BYTES, Decimal.LOGICAL_NAME, "VARCHAR"), - Arguments.of(Schema.Type.BYTES, null, "BINARY")); + Arguments.of(Schema.Type.BYTES, null, "BINARY"), + Arguments.of(Schema.Type.MAP, null, "MAP"), + Arguments.of(Schema.Type.ARRAY, null, "ARRAY"), + Arguments.of(Schema.Type.STRUCT, null, "OBJECT")); } private static Stream kafkaTypesToThrowException() { diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java new file mode 100644 index 000000000..328210378 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -0,0 +1,242 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import com.fasterxml.jackson.databind.JsonNode; +import com.snowflake.kafka.connector.records.RecordService; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.stream.Stream; +import org.apache.iceberg.types.Type; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class ParseIcebergColumnTreeTest { + + private final IcebergColumnTreeFactory treeFactory = new IcebergColumnTreeFactory(); + private final IcebergColumnTreeMerger mergeTreeService = new IcebergColumnTreeMerger(); + private final IcebergColumnTreeTypeBuilder typeBuilder = new IcebergColumnTreeTypeBuilder(); + + @ParameterizedTest + @MethodSource("icebergSchemas") + void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedType) { + // given + Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); + // when + IcebergColumnSchema apacheSchema = new IcebergColumnSchema(type, "TEST_COLUMN_NAME"); + IcebergColumnTree tree = treeFactory.fromIcebergSchema(apacheSchema); + // then + Assertions.assertEquals(expectedType, typeBuilder.buildType(tree)); + Assertions.assertEquals("TEST_COLUMN_NAME", tree.getColumnName()); + } + + static Stream icebergSchemas() { + return Stream.of( + // primitives + arguments("\"boolean\"", "BOOLEAN"), + arguments("\"int\"", "INT"), + arguments("\"long\"", "LONG"), + arguments("\"float\"", "DOUBLE"), + arguments("\"double\"", "DOUBLE"), + arguments("\"date\"", "DATE"), + arguments("\"time\"", "TIME(6)"), + arguments("\"timestamptz\"", "TIMESTAMP_LTZ"), + arguments("\"timestamp\"", "TIMESTAMP"), + arguments("\"string\"", "VARCHAR(16777216)"), + arguments("\"uuid\"", "BINARY(16)"), + arguments("\"binary\"", "BINARY"), + arguments("\"decimal(10,5)\"", "DECIMAL(10, 5)"), + // simple struct + arguments( + "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}", + "OBJECT(k1 INT, k2 INT)"), + // list + arguments( + "{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}", + "ARRAY(LONG)"), + arguments( + "{\"type\":\"list\",\"element-id\":1,\"element\":{\"type\":\"struct\",\"fields\":[{\"id\":1,\"name\":\"primitive\",\"required\":true,\"type\":\"boolean\"}]},\"element-required\":true}", + "ARRAY(OBJECT(primitive BOOLEAN))"), + // map + arguments( + "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}", + "MAP(INT, VARCHAR(16777216))"), + // structs with nested objects + arguments( + "{\"type\":\"struct\",\"fields\":[" + + "{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}," + + " {\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[" + + " {\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"}," + + " {\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}" + + "]}}]}", + "OBJECT(k1 INT, k2 INT, nested_object" + + " OBJECT(nested_key1 VARCHAR(16777216), nested_key2 VARCHAR(16777216)))"), + arguments( + "{\"type\":\"struct\",\"fields\":[{\"id\":2,\"name\":\"offset\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"topic\",\"required\":false,\"type\":\"string\"},{\"id\":4,\"name\":\"partition\",\"required\":false,\"type\":\"int\"},{\"id\":5,\"name\":\"key\",\"required\":false,\"type\":\"string\"},{\"id\":6,\"name\":\"schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":7,\"name\":\"key_schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":8,\"name\":\"CreateTime\",\"required\":false,\"type\":\"long\"},{\"id\":9,\"name\":\"LogAppendTime\",\"required\":false,\"type\":\"long\"},{\"id\":10,\"name\":\"SnowflakeConnectorPushTime\",\"required\":false,\"type\":\"long\"},{\"id\":11,\"name\":\"headers\",\"required\":false,\"type\":{\"type\":\"map\",\"key-id\":12,\"key\":\"string\",\"value-id\":13,\"value\":\"string\",\"value-required\":false}}]}\n", + "OBJECT(offset INT, topic VARCHAR(16777216), partition" + + " INT, key VARCHAR(16777216), schema_id INT, key_schema_id" + + " INT, CreateTime LONG, LogAppendTime LONG," + + " SnowflakeConnectorPushTime LONG, headers MAP(VARCHAR(16777216)," + + " VARCHAR(16777216)))")); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("parseFromJsonArguments") + void parseFromJsonRecordSchema(String jsonString, String expectedType) { + // given + SinkRecord record = createKafkaRecord(jsonString); + JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true); + IcebergColumnJsonValuePair columnValuePair = + IcebergColumnJsonValuePair.from(recordNode.fields().next()); + // when + IcebergColumnTree tree = treeFactory.fromJson(columnValuePair); + // then + Assertions.assertEquals(expectedType, typeBuilder.buildType(tree)); + Assertions.assertEquals("TESTCOLUMNNAME", tree.getColumnName()); + } + + static Stream parseFromJsonArguments() { + return Stream.of( + arguments("{\"testColumnName\" : 1 }", "LONG"), + arguments( + "{ \"testColumnName\": { \"k1\" : 1, \"k2\" : 2 } }", "OBJECT(k1 LONG, k2 LONG)"), + arguments( + "{ \"testColumnName\": {" + + "\"k1\" : { \"nested_key1\" : 1}," + + "\"k2\" : { \"nested_key2\" : 2}" + + "}}", + "OBJECT(k1 OBJECT(nested_key1 LONG), k2 OBJECT(nested_key2 LONG))"), + arguments( + "{ \"testColumnName\": {" + + "\"vehicle1\" : { \"car\" : { \"brand\" : \"vw\" } }," + + "\"vehicle2\" : { \"car\" : { \"brand\" : \"toyota\" } }" + + "}}", + "OBJECT(vehicle2 OBJECT(car OBJECT(brand VARCHAR)), " + + "vehicle1 OBJECT(car OBJECT(brand VARCHAR)))"), + arguments( + "{ \"testColumnName\": {" + + "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } }," + + "\"k2\" : { \"car\" : { \"brand\" : \"toyota\" } }" + + "}}", + "OBJECT(k1 OBJECT(car OBJECT(brand VARCHAR)), k2" + + " OBJECT(car" + + " OBJECT(brand" + + " VARCHAR)))"), + arguments( + " { \"testColumnName\": [" + + " {" + + " \"id\": 0," + + " \"name\": \"Sandoval Hodges\"" + + " }," + + " {" + + " \"id\": 1," + + " \"name\": \"Ramirez Brooks\"" + + " }," + + " {" + + " \"id\": 2," + + " \"name\": \"Vivian Whitfield\"" + + " }" + + " ] } ", + "ARRAY(OBJECT(name VARCHAR, id LONG))"), + // array + arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)"), + arguments("{ \"testColumnName\": [] }", "ARRAY(VARCHAR(16777216))")); + } + + @ParameterizedTest + @MethodSource("mergeTestArguments") + void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expectedResult) { + // given tree parsed from channel + Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); + IcebergColumnSchema apacheSchema = new IcebergColumnSchema(type, "TESTSTRUCT"); + IcebergColumnTree alreadyExistingTree = treeFactory.fromIcebergSchema(apacheSchema); + + // tree parsed from a record + SinkRecord record = createKafkaRecord(recordJson); + JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true); + IcebergColumnJsonValuePair columnValuePair = + IcebergColumnJsonValuePair.from(recordNode.fields().next()); + + IcebergColumnTree modifiedTree = treeFactory.fromJson(columnValuePair); + // when + mergeTreeService.merge(alreadyExistingTree, modifiedTree); + // then + String expected = expectedResult.replaceAll("/ +/g", " "); + Assertions.assertEquals(expected, typeBuilder.buildType(alreadyExistingTree)); + Assertions.assertEquals("TESTSTRUCT", alreadyExistingTree.getColumnName()); + } + + static Stream mergeTestArguments() { + return Stream.of( + arguments( + "{\"type\":\"struct\",\"fields\":[" + + "{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"}," + + "{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}" + + "]}", + "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : 3 } }", + "OBJECT(k1 INT, k2 INT, k3 LONG)"), + arguments( + "{\"type\":\"struct\",\"fields\":[" + + "{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}," + + " {\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[" + + " {\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"}," + + " {\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}" + + "]}}]}", + "{\"testStruct\" : {" + + " \"k1\" : 1, " + + " \"k2\" : 2, " + + " \"nested_object\": { " + + " \"nested_key1\" : \"string\", " + + " \"nested_key2\" : \"blah\", " + + " \"nested_object2\" : { " + + " \"nested_key2\" : 23.5 " + + " }}" + + "}}", + "OBJECT(k1 INT, k2 INT, nested_object OBJECT(nested_key1" + + " VARCHAR(16777216), nested_key2 VARCHAR(16777216), nested_object2" + + " OBJECT(nested_key2 DOUBLE)))"), + // ARRAY merge + arguments( + "{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}", + "{\"TESTSTRUCT\": [1,2,3] }", + "ARRAY(LONG)"), + arguments( + "{\"type\":\"list\",\"element-id\":1,\"element\":{" + + "\"type\":\"struct\",\"fields\":[" + + "{\"id\":1,\"name\":\"primitive\",\"required\":true,\"type\":\"boolean\"}" + + "]}," + + "\"element-required\":true}", + "{\"TESTSTRUCT\": [ { \"primitive\" : true, \"new_field\" : 25878749237493287429348 }]" + + " }", + "ARRAY(OBJECT(primitive BOOLEAN, new_field LONG))")); + } + + protected SinkRecord createKafkaRecord(String jsonString) { + int offset = 0; + JsonConverter converter = new JsonConverter(); + converter.configure(Collections.singletonMap("schemas.enable", Boolean.toString(false)), false); + SchemaAndValue inputValue = + converter.toConnectData("TOPIC_NAME", jsonString.getBytes(StandardCharsets.UTF_8)); + Headers headers = new ConnectHeaders(); + return new SinkRecord( + "TOPIC_NAME", + 1, + Schema.STRING_SCHEMA, + "test", + inputValue.schema(), + inputValue.value(), + offset, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + headers); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 682baef01..5b2587e2b 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -1,8 +1,6 @@ package com.snowflake.kafka.connector.streaming.iceberg; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ICEBERG_ENABLED; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; import static com.snowflake.kafka.connector.internal.TestUtils.getConfForStreaming; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; @@ -51,6 +49,8 @@ public void setUp() { config.put(ICEBERG_ENABLED, "TRUE"); config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString()); config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true"); + // "snowflake.streaming.max.client.lag" = 1 second, for faster tests + config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1"); createIcebergTable(); enableSchemaEvolution(tableName); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java index a58d42239..72ef11e7e 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java @@ -75,7 +75,8 @@ private static Stream prepareData() { return Stream.of( Arguments.of( "Complex JSON with schema", ComplexJsonRecord.complexJsonWithSchemaExample, true), - Arguments.of("Complex JSON without schema", ComplexJsonRecord.complexJsonExample, false)); + Arguments.of( + "Complex JSON without schema", ComplexJsonRecord.complexJsonPayloadExample, false)); } @ParameterizedTest(name = "{0}") diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 08f0bf2f3..338bc4130 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -1,10 +1,10 @@ package com.snowflake.kafka.connector.streaming.iceberg; -import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.emptyPrimitiveJsonRecordValueExample; -import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonExample; -import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonRecordValueExample; -import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonWithSchemaExample; +import static com.snowflake.kafka.connector.streaming.iceberg.TestJsons.*; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.*; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.*; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.DescribeTableRow; @@ -17,12 +17,20 @@ import java.util.stream.Stream; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; public class IcebergIngestionSchemaEvolutionIT extends IcebergIngestionIT { + private static final String RECORD_METADATA_TYPE = + "OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition NUMBER(10,0), key" + + " VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id NUMBER(10,0)," + + " CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0)," + + " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216)," + + " VARCHAR(16777216)))"; + @Override protected Boolean isSchemaEvolutionEnabled() { return true; @@ -81,6 +89,91 @@ void shouldEvolveSchemaAndInsertRecords( assertRecordsInTable(); } + private static Stream prepareData() { + return Stream.of( + Arguments.of( + "Primitive JSON with schema", + primitiveJsonWithSchemaExample, + new DescribeTableRow[] { + new DescribeTableRow("ID_INT8", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT16", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT32", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), + new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), + new DescribeTableRow("RATING_FLOAT32", "FLOAT"), + new DescribeTableRow("RATING_FLOAT64", "FLOAT"), + new DescribeTableRow("APPROVAL", "BOOLEAN") + }, + true), + Arguments.of( + "Primitive JSON without schema", + primitiveJsonExample, + new DescribeTableRow[] { + new DescribeTableRow("ID_INT8", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT16", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT32", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), + new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), + new DescribeTableRow("RATING_FLOAT32", "FLOAT"), + new DescribeTableRow("RATING_FLOAT64", "FLOAT"), + new DescribeTableRow("APPROVAL", "BOOLEAN") + }, + false)); + } + + /** Verify a scenario when structure is enriched with another field. */ + @Test + @Disabled + public void alterStructure_noSchema() throws Exception { + // k1, k2 + String testStruct1 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2 } }"; + insertWithRetry(testStruct1, 0, false); + waitForOffset(1); + + // k1, k2 + k3 + String testStruct2 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : \"foo\" } }"; + insertWithRetry(testStruct2, 1, false); + waitForOffset(2); + + // k1, k2, k3 + k4 + String testStruct3 = + "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : \"bar\", \"k4\" : 4.5 } }"; + insertWithRetry(testStruct3, 2, false); + waitForOffset(3); + + List columns = describeTable(tableName); + assertEquals( + columns.get(1).getType(), + "OBJECT(k1 NUMBER(19,0), k2 NUMBER(19,0), k3 VARCHAR(16777216), k4 FLOAT)"); + + // k2, k3, k4 + String testStruct4 = "{ \"testStruct\": { \"k2\" : 2, \"k3\" : 3, \"k4\" : 4.34 } }"; + insertWithRetry(testStruct4, 3, false); + waitForOffset(4); + + columns = describeTable(tableName); + assertEquals( + columns.get(1).getType(), + "OBJECT(k1 NUMBER(19,0), k2 NUMBER(19,0), k3 VARCHAR(16777216), k4 FLOAT)"); + + // k5, k6 + String testStruct5 = "{ \"testStruct\": { \"k5\" : 2, \"k6\" : 3 } }"; + insertWithRetry(testStruct5, 4, false); + waitForOffset(5); + + columns = describeTable(tableName); + assertEquals( + columns.get(1).getType(), + "OBJECT(k1 NUMBER(19,0), k2 NUMBER(19,0), k3 VARCHAR(16777216), k4 FLOAT, k5 NUMBER(19,0)," + + " k6 NUMBER(19,0))"); + assertEquals(columns.size(), 2); + } + + private void insertWithRetry(String record, int offset, boolean withSchema) { + service.insert(Collections.singletonList(createKafkaRecord(record, offset, withSchema))); + service.insert(Collections.singletonList(createKafkaRecord(record, offset, withSchema))); + } + private void assertRecordsInTable() { List> recordsWithMetadata = selectAllSchematizedRecords(); @@ -107,35 +200,287 @@ private void assertRecordsInTable() { && r.getSnowflakeConnectorPushTime() != null); } - private static Stream prepareData() { + @Test + @Disabled + public void testComplexRecordEvolution_withSchema() throws Exception { + insertWithRetry(complexJsonWithSchemaExample, 0, true); + waitForOffset(1); + + List columns = describeTable(tableName); + assertEquals(columns.size(), 16); + + DescribeTableRow[] expectedSchema = + new DescribeTableRow[] { + new DescribeTableRow("RECORD_METADATA", RECORD_METADATA_TYPE), + new DescribeTableRow("ID_INT8", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT16", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT32", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), + new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), + new DescribeTableRow("RATING_FLOAT32", "FLOAT"), + new DescribeTableRow("RATING_FLOAT64", "FLOAT"), + new DescribeTableRow("APPROVAL", "BOOLEAN"), + new DescribeTableRow("ARRAY1", "ARRAY(NUMBER(10,0))"), + new DescribeTableRow("ARRAY2", "ARRAY(VARCHAR(16777216))"), + new DescribeTableRow("ARRAY3", "ARRAY(BOOLEAN)"), + new DescribeTableRow("ARRAY4", "ARRAY(NUMBER(10,0))"), + new DescribeTableRow("ARRAY5", "ARRAY(ARRAY(NUMBER(10,0)))"), + new DescribeTableRow( + "NESTEDRECORD", + "OBJECT(id_int8 NUMBER(10,0), id_int16 NUMBER(10,0), id_int32 NUMBER(10,0), id_int64" + + " NUMBER(19,0), description VARCHAR(16777216), rating_float32 FLOAT," + + " rating_float64 FLOAT, approval BOOLEAN)"), + new DescribeTableRow( + "NESTEDRECORD2", + "OBJECT(id_int8 NUMBER(10,0), id_int16 NUMBER(10,0), id_int32 NUMBER(10,0), id_int64" + + " NUMBER(19,0), description VARCHAR(16777216), rating_float32 FLOAT," + + " rating_float64 FLOAT, approval BOOLEAN)"), + }; + assertThat(columns).containsExactlyInAnyOrder(expectedSchema); + } + + @Test + @Disabled + public void testComplexRecordEvolution() throws Exception { + insertWithRetry(complexJsonPayloadExample, 0, false); + waitForOffset(1); + + List columns = describeTable(tableName); + assertEquals(columns.size(), 16); + + DescribeTableRow[] expectedSchema = + new DescribeTableRow[] { + new DescribeTableRow("RECORD_METADATA", RECORD_METADATA_TYPE), + new DescribeTableRow("ID_INT8", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT16", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT32", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), + new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), + new DescribeTableRow("RATING_FLOAT32", "FLOAT"), + new DescribeTableRow("RATING_FLOAT64", "FLOAT"), + new DescribeTableRow("APPROVAL", "BOOLEAN"), + new DescribeTableRow("ARRAY1", "ARRAY(NUMBER(19,0))"), + new DescribeTableRow("ARRAY2", "ARRAY(VARCHAR(16777216))"), + new DescribeTableRow("ARRAY3", "ARRAY(BOOLEAN)"), + // "array4" : null -> VARCHAR(16777216 + new DescribeTableRow("ARRAY4", "VARCHAR(16777216)"), + new DescribeTableRow("ARRAY5", "ARRAY(ARRAY(NUMBER(19,0)))"), + new DescribeTableRow( + "NESTEDRECORD", + "OBJECT(id_int8 NUMBER(19,0), id_int16 NUMBER(19,0), rating_float32 FLOAT," + + " rating_float64 FLOAT, approval BOOLEAN, id_int32 NUMBER(19,0), description" + + " VARCHAR(16777216), id_int64 NUMBER(19,0))"), + // "nestedRecord2": null -> VARCHAR(16777216) + new DescribeTableRow("NESTEDRECORD2", "VARCHAR(16777216)"), + }; + assertThat(columns).containsExactlyInAnyOrder(expectedSchema); + } + + /** Test just for a scenario when we see a record for the first time. */ + @ParameterizedTest + @MethodSource("schemasAndPayloads_brandNewColumns") + @Disabled + public void addBrandNewColumns_withSchema( + String payloadWithSchema, String expectedColumnName, String expectedType) throws Exception { + // when + insertWithRetry(payloadWithSchema, 0, true); + waitForOffset(1); + // then + List columns = describeTable(tableName); + + assertEquals(2, columns.size()); + assertEquals(expectedColumnName, columns.get(1).getColumn()); + assertEquals(expectedType, columns.get(1).getType()); + } + + private static Stream schemasAndPayloads_brandNewColumns() { return Stream.of( Arguments.of( - "Primitive JSON with schema", - primitiveJsonWithSchemaExample, - new DescribeTableRow[] { - new DescribeTableRow("ID_INT8", "NUMBER(10,0)"), - new DescribeTableRow("ID_INT16", "NUMBER(10,0)"), - new DescribeTableRow("ID_INT32", "NUMBER(10,0)"), - new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), - new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), - new DescribeTableRow("RATING_FLOAT32", "FLOAT"), - new DescribeTableRow("RATING_FLOAT64", "FLOAT"), - new DescribeTableRow("APPROVAL", "BOOLEAN") - }, + nestedObjectWithSchema(), + "OBJECT_WITH_NESTED_OBJECTS", + "OBJECT(nestedStruct OBJECT(description VARCHAR(16777216)))"), + Arguments.of( + simpleMapWithSchema(), "SIMPLE_TEST_MAP", "MAP(VARCHAR(16777216), NUMBER(10,0))"), + Arguments.of(simpleArrayWithSchema(), "SIMPLE_ARRAY", "ARRAY(NUMBER(10,0))"), + Arguments.of( + complexPayloadWithSchema(), + "OBJECT", + "OBJECT(arrayOfMaps ARRAY(MAP(VARCHAR(16777216), FLOAT)))")); + } + + @ParameterizedTest + @MethodSource("primitiveEvolutionDataSource") + @Disabled + public void testEvolutionOfPrimitives_withSchema( + String singleBooleanField, + String booleanAndInt, + String booleanAndAllKindsOfInt, + String allPrimitives, + boolean withSchema) + throws Exception { + // when insert BOOLEAN + insertWithRetry(singleBooleanField, 0, withSchema); + waitForOffset(1); + List columns = describeTable(tableName); + // verify number of columns, datatype and column name + assertEquals(2, columns.size()); + assertEquals("TEST_BOOLEAN", columns.get(1).getColumn()); + assertEquals("BOOLEAN", columns.get(1).getType()); + + // evolve the schema BOOLEAN, INT64 + insertWithRetry(booleanAndInt, 1, withSchema); + waitForOffset(2); + columns = describeTable(tableName); + assertEquals(3, columns.size()); + // verify data types in already existing column were not changed + assertEquals("TEST_BOOLEAN", columns.get(1).getColumn()); + assertEquals("BOOLEAN", columns.get(1).getType()); + // verify new columns + assertEquals("TEST_INT64", columns.get(2).getColumn()); + assertEquals("NUMBER(19,0)", columns.get(2).getType()); + + // evolve the schema BOOLEAN, INT64, INT32, INT16, INT8, + insertWithRetry(booleanAndAllKindsOfInt, 2, withSchema); + waitForOffset(3); + columns = describeTable(tableName); + assertEquals(6, columns.size()); + // verify data types in already existing column were not changed + + // without schema every number is parsed to NUMBER(19,0) + String SMALL_INT = withSchema ? "NUMBER(10,0)" : "NUMBER(19,0)"; + DescribeTableRow[] expectedSchema = + new DescribeTableRow[] { + new DescribeTableRow("RECORD_METADATA", RECORD_METADATA_TYPE), + new DescribeTableRow("TEST_BOOLEAN", "BOOLEAN"), + new DescribeTableRow("TEST_INT8", SMALL_INT), + new DescribeTableRow("TEST_INT16", SMALL_INT), + new DescribeTableRow("TEST_INT32", SMALL_INT), + new DescribeTableRow("TEST_INT64", "NUMBER(19,0)") + }; + assertThat(columns).containsExactlyInAnyOrder(expectedSchema); + + // evolve the schema BOOLEAN, INT64, INT32, INT16, INT8, FLOAT, DOUBLE, STRING + insertWithRetry(allPrimitives, 3, withSchema); + waitForOffset(4); + columns = describeTable(tableName); + assertEquals(9, columns.size()); + + expectedSchema = + new DescribeTableRow[] { + new DescribeTableRow("RECORD_METADATA", RECORD_METADATA_TYPE), + new DescribeTableRow("TEST_BOOLEAN", "BOOLEAN"), + new DescribeTableRow("TEST_INT8", SMALL_INT), + new DescribeTableRow("TEST_INT16", SMALL_INT), + new DescribeTableRow("TEST_INT32", SMALL_INT), + new DescribeTableRow("TEST_INT64", "NUMBER(19,0)"), + new DescribeTableRow("TEST_STRING", "VARCHAR(16777216)"), + new DescribeTableRow("TEST_FLOAT", "FLOAT"), + new DescribeTableRow("TEST_DOUBLE", "FLOAT") + }; + + assertThat(columns).containsExactlyInAnyOrder(expectedSchema); + } + + private static Stream primitiveEvolutionDataSource() { + return Stream.of( + Arguments.of( + singleBooleanField(), + booleanAndIntWithSchema(), + booleanAndAllKindsOfIntWithSchema(), + allPrimitivesWithSchema(), true), Arguments.of( - "Primitive JSON without schema", - primitiveJsonExample, - new DescribeTableRow[] { - new DescribeTableRow("ID_INT8", "NUMBER(19,0)"), - new DescribeTableRow("ID_INT16", "NUMBER(19,0)"), - new DescribeTableRow("ID_INT32", "NUMBER(19,0)"), - new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), - new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), - new DescribeTableRow("RATING_FLOAT32", "FLOAT"), - new DescribeTableRow("RATING_FLOAT64", "FLOAT"), - new DescribeTableRow("APPROVAL", "BOOLEAN") - }, + singleBooleanFieldPayload(), + booleanAndIntPayload(), + booleanAndAllKindsOfIntPayload(), + allPrimitivesPayload(), + false)); + } + + @ParameterizedTest + @MethodSource("testEvolutionOfComplexTypes_dataSource") + @Disabled + public void testEvolutionOfComplexTypes_withSchema( + String objectVarchar, + String objectWithNestedObject, + String twoObjects, + String twoObjectsExtendedWithMapAndArray, + boolean withSchema) + throws Exception { + // insert + insertWithRetry(objectVarchar, 0, withSchema); + waitForOffset(1); + List columns = describeTable(tableName); + // verify number of columns, datatype and column name + assertEquals(2, columns.size()); + assertEquals("OBJECT", columns.get(1).getColumn()); + assertEquals("OBJECT(test_string VARCHAR(16777216))", columns.get(1).getType()); + + // evolution + insertWithRetry(objectWithNestedObject, 1, withSchema); + waitForOffset(2); + columns = describeTable(tableName); + // verify number of columns, datatype and column name + assertEquals(2, columns.size()); + assertEquals("OBJECT", columns.get(1).getColumn()); + assertEquals( + "OBJECT(test_string VARCHAR(16777216), nested_object OBJECT(test_string" + + " VARCHAR(16777216)))", + columns.get(1).getType()); + + // evolution + insertWithRetry(twoObjects, 2, withSchema); + waitForOffset(3); + columns = describeTable(tableName); + + assertEquals(3, columns.size()); + // 1st column + assertEquals("OBJECT", columns.get(1).getColumn()); + assertEquals( + "OBJECT(test_string VARCHAR(16777216), nested_object OBJECT(test_string" + + " VARCHAR(16777216)))", + columns.get(1).getType()); + // 2nd column + assertEquals("OBJECT_WITH_NESTED_OBJECTS", columns.get(2).getColumn()); + assertEquals( + "OBJECT(nestedStruct OBJECT(description VARCHAR(16777216)))", columns.get(2).getType()); + + // evolution + insertWithRetry(twoObjectsExtendedWithMapAndArray, 3, withSchema); + waitForOffset(4); + columns = describeTable(tableName); + + assertEquals(3, columns.size()); + // 1st column + assertEquals("OBJECT", columns.get(1).getColumn()); + if (withSchema) { + // MAP is not supported without schema, execute this assertion only when there is a schema, + assertEquals( + "OBJECT(test_string VARCHAR(16777216), nested_object OBJECT(test_string" + + " VARCHAR(16777216)), Test_Map MAP(VARCHAR(16777216), OBJECT(test_string" + + " VARCHAR(16777216))))", + columns.get(1).getType()); + } + // 2nd column + assertEquals("OBJECT_WITH_NESTED_OBJECTS", columns.get(2).getColumn()); + assertEquals( + "OBJECT(nestedStruct OBJECT(description VARCHAR(16777216), test_array ARRAY(FLOAT)))", + columns.get(2).getType()); + } + + private static Stream testEvolutionOfComplexTypes_dataSource() { + return Stream.of( + Arguments.of( + objectVarcharWithSchema(), + objectWithNestedObjectWithSchema(), + twoObjectsWithSchema(), + twoObjectsExtendedWithMapAndArrayWithSchema(), + true), + Arguments.of( + objectVarcharPayload, + objectWithNestedObjectPayload(), + twoObjectsWithSchemaPayload(), + twoObjectsExtendedWithMapAndArrayPayload(), false)); } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java new file mode 100644 index 000000000..449e1187e --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java @@ -0,0 +1,504 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +/** + * Class to provide Iceberg schema evolution tests with schemas and payload. It decreases the size + * of the test file . + */ +class TestJsons { + + static String nestedObjectWithSchema() { + return "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"object_With_Nested_Objects\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"nestedStruct\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"description\"," + + " \"type\": \"string\"" + + " }" + + " ]" + + " }" + + " ]" + + " }" + + " ]," + + " \"optional\": true," + + " \"name\": \"sf.kc.test\"" + + " }," + + " \"payload\": " + + nestedObjectsPayload + + "}"; + } + + static String nestedObjectsPayload = + "{" + + " \"object_With_Nested_Objects\": {" + + " \"nestedStruct\": {" + + " \"description\": \"txt\"" + + " }" + + " }" + + " }"; + + static String simpleMapWithSchema() { + return "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"simple_Test_Map\"," + + " \"type\": \"map\"," + + " \"keys\": {" + + " \"type\": \"string\"" + + " }," + + " \"values\": {" + + " \"type\": \"int32\"" + + " }" + + " }" + + " ]" + + " }," + + " \"payload\":" + + simpleMapPayload + + "}"; + } + + static String simpleMapPayload = + "{" + + " \"simple_Test_Map\": {" + + " \"key1\": 12," + + " \"key2\": 15" + + " }" + + " }"; + + static String simpleArrayWithSchema() { + return "{\n" + + " \"schema\": {\n" + + " \"type\": \"struct\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"field\": \"simple_Array\",\n" + + " \"type\": \"array\",\n" + + " \"items\": {\n" + + " \"type\": \"int32\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"payload\":" + + simpleArrayPayload + + "}"; + } + + static String simpleArrayPayload = "{ \"simple_Array\": [ 1,2,3] } "; + + /** Object containing a list of maps */ + static String complexPayloadWithSchema() { + return "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"object\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"arrayOfMaps\"," + + " \"type\": \"array\"," + + " \"items\": {" + + " \"field\": \"simple_Test_Map\"," + + " \"type\": \"map\"," + + " \"keys\": {" + + " \"type\": \"string\"" + + " }," + + " \"values\": {" + + " \"type\": \"float\"" + + " }" + + " }" + + " }" + + " ]" + + " }" + + " ]" + + " }," + + " \"payload\":" + + complexPayload + + "}"; + } + + static String complexPayload = + "{" + + " \"object\": {" + + " \"arrayOfMaps\": [" + + " {" + + " \"simple_Test_Map\": {" + + " \"keyString\": 3.14 " + + " }" + + " }" + + " ]" + + " }" + + " }"; + + static String singleBooleanField() { + return SCHEMA_BEGINNING + + BOOL_SCHEMA + + SCHEMA_END + + "\"payload\":" + + singleBooleanFieldPayload() + + "}"; + } + + static String singleBooleanFieldPayload() { + return "{" + BOOL_PAYLOAD + "}"; + } + + static String booleanAndIntWithSchema() { + return SCHEMA_BEGINNING + + BOOL_SCHEMA + + "," + + INT64_SCHEMA + + SCHEMA_END + + "\"payload\":" + + booleanAndIntPayload() + + "}"; + } + + static String booleanAndIntPayload() { + return "{" + BOOL_PAYLOAD + "," + INT64_PAYLOAD + "}"; + } + + static String booleanAndAllKindsOfIntWithSchema() { + return SCHEMA_BEGINNING + + BOOL_SCHEMA + + "," + + INT64_SCHEMA + + "," + + INT32_SCHEMA + + "," + + INT16_SCHEMA + + "," + + INT8_SCHEMA + + SCHEMA_END + + "\"payload\":" + + booleanAndAllKindsOfIntPayload() + + "}"; + } + + static String booleanAndAllKindsOfIntPayload() { + return "{" + + BOOL_PAYLOAD + + "," + + INT64_PAYLOAD + + "," + + INT32_PAYLOAD + + "," + + INT16_PAYLOAD + + "," + + INT8_PAYLOAD + + "}"; + } + + static String allPrimitivesWithSchema() { + return SCHEMA_BEGINNING + + BOOL_SCHEMA + + "," + + INT64_SCHEMA + + "," + + INT32_SCHEMA + + "," + + INT16_SCHEMA + + "," + + INT8_SCHEMA + + "," + + FLOAT_SCHEMA + + "," + + DOUBLE_SCHEMA + + "," + + STRING_SCHEMA + + SCHEMA_END + + "\"payload\":" + + allPrimitivesPayload() + + "}"; + } + + static String allPrimitivesPayload() { + return "{" + + BOOL_PAYLOAD + + "," + + INT64_PAYLOAD + + "," + + INT32_PAYLOAD + + "," + + INT16_PAYLOAD + + "," + + INT8_PAYLOAD + + "," + + FLOAT_PAYLOAD + + "," + + DOUBLE_PAYLOAD + + "," + + STRING_PAYLOAD + + "}"; + } + + /** + * Schemas and payload for {@link + * IcebergIngestionSchemaEvolutionIT#testEvolutionOfComplexTypes_withSchema} test + */ + static String objectVarcharWithSchema() { + return "{ " + + " \"schema\": { " + + " \"type\": \"struct\", " + + " \"fields\": [ " + + " { " + + " \"field\": \"object\", " + + " \"type\": \"struct\", " + + " \"fields\": [ " + + " { " + + " \"field\": \"test_string\", " + + " \"type\": \"string\" " + + " } " + + " ] " + + " } " + + " ] " + + " }, " + + " \"payload\": " + + objectVarcharPayload + + "}"; + } + + static String objectVarcharPayload = "{ \"object\": { \"test_string\": \"very long string\" }} "; + + static String objectWithNestedObjectWithSchema() { + return "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"object\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"test_string\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"nested_object\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"test_string\"," + + " \"type\": \"string\"" + + " }" + + " ]" + + " }" + + " ]" + + " }" + + " ]" + + " }," + + " \"payload\": " + + objectWithNestedObjectPayload() + + "}"; + } + + static String objectWithNestedObjectPayload() { + return "{" + + " \"object\": {" + + " \"test_string\": \"very long string\"," + + " \"nested_object\": {" + + " \"test_string\": \"pretty string\"" + + " }" + + " }" + + " }"; + } + + static String twoObjectsWithSchema() { + return "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"object\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"test_string\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"nested_object\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"test_string\"," + + " \"type\": \"string\"" + + " }" + + " ]" + + " }" + + " ]" + + " }," + + " {" + + " \"field\": \"object_With_Nested_Objects\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"nestedStruct\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"description\"," + + " \"type\": \"string\"" + + " }" + + " ]" + + " }" + + " ]" + + " }" + + " ]" + + " }," + + " \"payload\": " + + twoObjectsWithSchemaPayload() + + "}"; + } + + static String twoObjectsWithSchemaPayload() { + return "{" + + " \"object\": {" + + " \"test_string\": \"very long string\"," + + " \"nested_object\": {" + + " \"test_string\": \"pretty string\"" + + " }" + + " }," + + " \"object_With_Nested_Objects\": {" + + " \"nestedStruct\": {" + + " \"description\": \"txt\"" + + " }" + + " }" + + " }"; + } + + static String twoObjectsExtendedWithMapAndArrayWithSchema() { + return "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"object\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"test_string\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"nested_object\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"test_string\"," + + " \"type\": \"string\"" + + " }" + + " ]" + + " }," + + " {" + + " \"field\": \"Test_Map\"," + + " \"type\": \"map\"," + + " \"keys\": {" + + " \"type\": \"string\"" + + " }," + + " \"values\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"test_string\"," + + " \"type\": \"string\"" + + " }" + + " ]" + + " }" + + " }" + + " ]" + + " }," + + " {" + + " \"field\": \"object_With_Nested_Objects\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"nestedStruct\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"description\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"test_array\"," + + " \"type\": \"array\"," + + " \"items\": {" + + " \"type\": \"double\"" + + " }" + + " }" + + " ]" + + " }" + + " ]" + + " }" + + " ]" + + " }," + + " \"payload\": " + + twoObjectsExtendedWithMapAndArrayPayload() + + "}"; + } + + static String twoObjectsExtendedWithMapAndArrayPayload() { + return "{" + + " \"object\": {" + + " \"test_string\": \"very long string\"," + + " \"nested_object\": {" + + " \"test_string\": \"pretty string\"" + + " }," + + " \"Test_Map\": {" + + " \"key1\": {" + + " \"test_string\": \"value string\"" + + " }" + + " }" + + " }," + + " \"object_With_Nested_Objects\": {" + + " \"nestedStruct\": {" + + " \"description\": \"txt\"," + + " \"test_array\": [" + + " 1.2," + + " 323.4," + + " 3.14" + + " ]" + + " }" + + " }" + + " }"; + } + + static String BOOL_SCHEMA = " { \"field\" : \"test_boolean\", \"type\" : \"boolean\"} "; + + static String INT64_SCHEMA = "{ \"field\" : \"test_int64\", \"type\" : \"int64\" }"; + static String INT32_SCHEMA = "{ \"field\" : \"test_int32\", \"type\" : \"int32\" }"; + static String INT16_SCHEMA = "{ \"field\" : \"test_int16\", \"type\" : \"int16\" }"; + static String INT8_SCHEMA = "{ \"field\" : \"test_int8\", \"type\" : \"int8\" }"; + + static String FLOAT_SCHEMA = "{ \"field\" : \"test_float\", \"type\" : \"float\" }"; + + static String DOUBLE_SCHEMA = "{ \"field\" : \"test_double\", \"type\" : \"double\" }"; + + static String STRING_SCHEMA = "{ \"field\" : \"test_string\", \"type\" : \"string\" }"; + + static final String BOOL_PAYLOAD = "\"test_boolean\" : true "; + static final String INT64_PAYLOAD = "\"test_int64\" : 2137324241343241 "; + static final String INT32_PAYLOAD = "\"test_int32\" : 2137 "; + static final String INT16_PAYLOAD = "\"test_int16\" : 2137 "; + static final String INT8_PAYLOAD = "\"test_int8\" : 2137 "; + static final String FLOAT_PAYLOAD = "\"test_float\" : 1939.30 "; + static final String DOUBLE_PAYLOAD = "\"test_double\" : 123.45793247859723 "; + static final String STRING_PAYLOAD = "\"test_string\" : \"very long string\" "; + + private static final String SCHEMA_BEGINNING = + "{ \"schema\": { \"type\": \"struct\", \"fields\": ["; + private static final String SCHEMA_END = "]},"; +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java index 6df73ded1..4a8637570 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java @@ -36,7 +36,7 @@ public class ComplexJsonRecord { PrimitiveJsonRecord.primitiveJsonRecordValueExample, null); - public static final String complexJsonExample = + public static final String complexJsonPayloadExample = "{" + " \"id_int8\": 8," + " \"id_int16\": 16," @@ -218,7 +218,7 @@ public class ComplexJsonRecord { + " \"name\": \"sf.kc.test\"" + " }," + " \"payload\": " - + complexJsonExample + + complexJsonPayloadExample + "}"; private static final ObjectMapper MAPPER = diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java deleted file mode 100644 index 010a73d21..000000000 --- a/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.snowflake.kafka.connector.streaming.schemaevolution.iceberg; - -import static org.junit.jupiter.params.provider.Arguments.arguments; - -import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.ApacheIcebergColumnSchema; -import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergColumnTree; -import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergDataTypeParser; -import java.util.stream.Stream; -import org.apache.iceberg.types.Type; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -public class ParseIcebergColumnTreeTest { - - @ParameterizedTest - @MethodSource("icebergSchemas") - void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedQuery) { - // given - Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); - // when - ApacheIcebergColumnSchema apacheSchema = - new ApacheIcebergColumnSchema(type, "TEST_COLUMN_NAME"); - IcebergColumnTree tree = new IcebergColumnTree(apacheSchema); - // then - Assertions.assertEquals(expectedQuery, tree.buildQuery()); - } - - static Stream icebergSchemas() { - return Stream.of( - // primitives - arguments("\"boolean\"", "TEST_COLUMN_NAME BOOLEAN"), - arguments("\"int\"", "TEST_COLUMN_NAME NUMBER(10,0)"), - arguments("\"long\"", "TEST_COLUMN_NAME NUMBER(19,0)"), - arguments("\"float\"", "TEST_COLUMN_NAME FLOAT"), - arguments("\"double\"", "TEST_COLUMN_NAME FLOAT"), - arguments("\"date\"", "TEST_COLUMN_NAME DATE"), - arguments("\"time\"", "TEST_COLUMN_NAME TIME(6)"), - arguments("\"timestamptz\"", "TEST_COLUMN_NAME TIMESTAMP_LTZ"), - arguments("\"timestamp\"", "TEST_COLUMN_NAME TIMESTAMP"), - arguments("\"string\"", "TEST_COLUMN_NAME VARCHAR(16777216)"), - arguments("\"uuid\"", "TEST_COLUMN_NAME BINARY(16)"), - arguments("\"binary\"", "TEST_COLUMN_NAME BINARY"), - arguments("\"decimal(10,5)\"", "TEST_COLUMN_NAME DECIMAL(10, 5)"), - // simple struct - arguments( - "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}", - "TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0))"), - // list - arguments( - "{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}", - "TEST_COLUMN_NAME ARRAY(NUMBER(19,0))"), - // map - arguments( - "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}", - "TEST_COLUMN_NAME MAP(NUMBER(10,0), VARCHAR(16777216))"), - // structs with nested objects - arguments( - "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"},{\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[{\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"},{\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}]}}]}", - "TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), nested_object" - + " OBJECT(nested_key1 VARCHAR(16777216), nested_key2 VARCHAR(16777216)))"), - arguments( - "{\"type\":\"struct\",\"fields\":[{\"id\":2,\"name\":\"offset\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"topic\",\"required\":false,\"type\":\"string\"},{\"id\":4,\"name\":\"partition\",\"required\":false,\"type\":\"int\"},{\"id\":5,\"name\":\"key\",\"required\":false,\"type\":\"string\"},{\"id\":6,\"name\":\"schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":7,\"name\":\"key_schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":8,\"name\":\"CreateTime\",\"required\":false,\"type\":\"long\"},{\"id\":9,\"name\":\"LogAppendTime\",\"required\":false,\"type\":\"long\"},{\"id\":10,\"name\":\"SnowflakeConnectorPushTime\",\"required\":false,\"type\":\"long\"},{\"id\":11,\"name\":\"headers\",\"required\":false,\"type\":{\"type\":\"map\",\"key-id\":12,\"key\":\"string\",\"value-id\":13,\"value\":\"string\",\"value-required\":false}}]}\n", - "TEST_COLUMN_NAME OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition" - + " NUMBER(10,0), key VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id" - + " NUMBER(10,0), CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0)," - + " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216)," - + " VARCHAR(16777216)))")); - } -}