Skip to content

Commit

Permalink
SNOW-1729292 modify iceberg tree based on record data (#1007)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek authored Dec 2, 2024
1 parent 487e7c4 commit 85db567
Show file tree
Hide file tree
Showing 28 changed files with 1,840 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public interface SnowflakeConnectionService {
*/
void appendColumnsToTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

/**
* Alter iceberg table to modify columns datatype
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their columnInfos
*/
void alterColumnsDataTypeIcebergTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

/**
* Alter iceberg table to add columns according to a map from columnNames to their types
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeConnectionV1;
import net.snowflake.client.jdbc.SnowflakeDriver;
import net.snowflake.client.jdbc.cloud.storage.StageInfo;
Expand Down Expand Up @@ -498,6 +499,39 @@ public boolean hasSchemaEvolutionPermission(String tableName, String role) {
return hasPermission;
}

/**
* Alter iceberg table to modify columns datatype
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their infos
*/
@Override
public void alterColumnsDataTypeIcebergTable(
String tableName, Map<String, ColumnInfos> columnInfosMap) {
LOGGER.debug("Modifying data types of iceberg table columns");
String alterSetDatatypeQuery = generateAlterSetDataTypeQuery(columnInfosMap);
executeStatement(tableName, alterSetDatatypeQuery);
}

private String generateAlterSetDataTypeQuery(Map<String, ColumnInfos> columnsToModify) {
StringBuilder setDataTypeQuery = new StringBuilder("alter iceberg ");
setDataTypeQuery.append("table identifier(?) alter column ");

String columnsPart =
columnsToModify.entrySet().stream()
.map(
column -> {
String columnName = column.getKey();
String dataType = column.getValue().getColumnType();
return columnName + " set data type " + dataType;
})
.collect(Collectors.joining(", "));

setDataTypeQuery.append(columnsPart);

return setDataTypeQuery.toString();
}

/**
* Alter table to add columns according to a map from columnNames to their types
*
Expand Down Expand Up @@ -552,18 +586,22 @@ private void appendColumnsToTable(
logColumn.append(columnName).append(" (").append(columnInfosMap.get(columnName)).append(")");
}

executeStatement(tableName, appendColumnQuery.toString());

logColumn.insert(0, "Following columns created for table {}:\n").append("]");
LOGGER.info(logColumn.toString(), tableName);
}

private void executeStatement(String tableName, String query) {
try {
LOGGER.info("Trying to run query: {}", appendColumnQuery.toString());
PreparedStatement stmt = conn.prepareStatement(appendColumnQuery.toString());
LOGGER.info("Trying to run query: {}", query);
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2015.getException(e);
}

logColumn.insert(0, "Following columns created for table {}:\n").append("]");
LOGGER.info(logColumn.toString(), tableName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ public enum SnowflakeErrors {
"Timeout while waiting for file cleaner to start",
"Could not allocate thread for file cleaner to start processing in given time. If problem"
+ " persists, please try setting snowflake.snowpipe.use_new_cleaner to false"),
;
ERROR_5025(
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution.");

// properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ public InsertRowsResponse get() throws Throwable {
LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx));
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx),
channel.getTableSchema());
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.*;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -539,7 +536,8 @@ private void handleInsertRowFailure(
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName());
if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
schemaEvolutionService.evolveSchemaIfNeeded(schemaEvolutionTargetItems, kafkaSinkRecord);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution;

import java.util.Map;
import net.snowflake.ingest.streaming.internal.ColumnProperties;
import org.apache.kafka.connect.sink.SinkRecord;

public interface SchemaEvolutionService {
Expand All @@ -11,6 +13,10 @@ public interface SchemaEvolutionService {
* @param targetItems target items for schema evolution such as table name, columns to drop
* nullability, and columns to add
* @param record the sink record that contains the schema and actual data
* @param existingSchema schema stored in a channel
*/
void evolveSchemaIfNeeded(SchemaEvolutionTargetItems targetItems, SinkRecord record);
void evolveSchemaIfNeeded(
SchemaEvolutionTargetItems targetItems,
SinkRecord record,
Map<String, ColumnProperties> existingSchema);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;

class IcebergColumnJsonValuePair {
private final String columnName;
private final JsonNode jsonNode;

static IcebergColumnJsonValuePair from(Map.Entry<String, JsonNode> field) {
return new IcebergColumnJsonValuePair(field.getKey(), field.getValue());
}

IcebergColumnJsonValuePair(String columnName, JsonNode jsonNode) {
this.columnName = columnName;
this.jsonNode = jsonNode;
}

String getColumnName() {
return columnName;
}

JsonNode getJsonNode() {
return jsonNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
import org.apache.iceberg.types.Type;

/** Wrapper class for Iceberg schema retrieved from channel. */
public class ApacheIcebergColumnSchema {
class IcebergColumnSchema {

private final Type schema;

private final String columnName;

public ApacheIcebergColumnSchema(Type schema, String columnName) {
IcebergColumnSchema(Type schema, String columnName) {
this.schema = schema;
this.columnName = columnName.toUpperCase();
this.columnName = columnName;
}

public Type getSchema() {
Type getSchema() {
return schema;
}

public String getColumnName() {
String getColumnName() {
return columnName;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

/** Class with object types compatible with Snowflake Iceberg table */
public class IcebergColumnTree {
class IcebergColumnTree {

private final IcebergFieldNode rootNode;

public IcebergColumnTree(ApacheIcebergColumnSchema columnSchema) {
this.rootNode = new IcebergFieldNode(columnSchema.getColumnName(), columnSchema.getSchema());
String getColumnName() {
return rootNode.name;
}

public String buildQuery() {
StringBuilder sb = new StringBuilder();
return rootNode.buildQuery(sb, "ROOT_NODE").toString();
IcebergFieldNode getRootNode() {
return rootNode;
}

IcebergColumnTree(IcebergFieldNode rootNode) {
this.rootNode = rootNode;
}
}
Loading

0 comments on commit 85db567

Please sign in to comment.