diff --git a/LICENSE b/LICENSE index 8ffd93214c..3295300aa5 100644 --- a/LICENSE +++ b/LICENSE @@ -231,6 +231,8 @@ This product includes code from Apache Iceberg. * org.apache.iceberg.spark.data.ParquetWithSparkSchemaVisitor copied and modified to org.apache.iceberg.spark.data.AdaptHiveParquetWithSparkSchemaVisitor * org.apache.iceberg.flink.data.ParquetWithFlinkSchemaVisitor copied and modified to org.apache.iceberg.flink.data.AdaptHiveParquetWithFlinkSchemaVisitor * org.apache.iceberg.parquet.ParquetUtil copied and modified to org.apache.iceberg.parquet.AdaptHiveParquetUtil +* org.apache.paimon.spark.SparkTypeUtilsorg.apache.paimon.spark.SparkTypeUtils copied and modified to com.netease.arctic.server.dashboard.component.reverser.PaimonTypeToSparkType +* org.apache.iceberg.spark.TypeToSparkType copied and modified to com.netease.arctic.server.dashboard.component.reverser.IcebergTypeToSparkType diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/MixedAndIcebergTableDescriptor.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/MixedAndIcebergTableDescriptor.java index 2c9b48753a..7173216113 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/dashboard/MixedAndIcebergTableDescriptor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/MixedAndIcebergTableDescriptor.java @@ -23,6 +23,8 @@ import com.netease.arctic.data.DataFileType; import com.netease.arctic.data.FileNameRules; import com.netease.arctic.op.SnapshotSummary; +import com.netease.arctic.server.dashboard.component.reverser.DDLReverser; +import com.netease.arctic.server.dashboard.component.reverser.IcebergTableMetaExtract; import com.netease.arctic.server.dashboard.model.AMSColumnInfo; import com.netease.arctic.server.dashboard.model.AMSPartitionField; import com.netease.arctic.server.dashboard.model.DDLInfo; @@ -44,13 +46,11 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataOperations; import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.InternalRecordWrapper; @@ -66,10 +66,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; /** @@ -230,62 +228,16 @@ public List getTransactionDetail(AmoroTable amoroTable public List getTableOperations(AmoroTable amoroTable) { ArcticTable arcticTable = getTable(amoroTable); - List result = new ArrayList<>(); Table table; if (arcticTable.isKeyedTable()) { table = arcticTable.asKeyedTable().baseTable(); } else { table = arcticTable.asUnkeyedTable(); } - List snapshotLog = ((HasTableOperations) table).operations().current().snapshotLog(); - List metadataLogEntries = - ((HasTableOperations) table).operations().current().previousFiles(); - Set time = new HashSet<>(); - snapshotLog.forEach(e -> time.add(e.timestampMillis())); - String lastMetadataLogEntryFile = null; - org.apache.iceberg.TableMetadata lastTableMetadata = null; - for (int i = 1; i < metadataLogEntries.size(); i++) { - org.apache.iceberg.TableMetadata.MetadataLogEntry currentEntry = metadataLogEntries.get(i); - if (!time.contains(currentEntry.timestampMillis())) { - org.apache.iceberg.TableMetadata.MetadataLogEntry previousEntry = metadataLogEntries.get(i - 1); - org.apache.iceberg.TableMetadata oldTableMetadata; - if (lastMetadataLogEntryFile == null || !lastMetadataLogEntryFile.equals(previousEntry.file())) { - oldTableMetadata = TableMetadataParser.read(table.io(), previousEntry.file()); - } else { - oldTableMetadata = lastTableMetadata; - } - - org.apache.iceberg.TableMetadata - newTableMetadata = TableMetadataParser.read(table.io(), currentEntry.file()); - lastMetadataLogEntryFile = currentEntry.file(); - lastTableMetadata = newTableMetadata; - - DDLInfo.Generator generator = new DDLInfo.Generator(); - result.addAll(generator.tableIdentify(arcticTable.id()) - .oldMeta(oldTableMetadata) - .newMeta(newTableMetadata) - .generate()); - } - } - if (metadataLogEntries.size() > 0) { - org.apache.iceberg.TableMetadata.MetadataLogEntry previousEntry = metadataLogEntries - .get(metadataLogEntries.size() - 1); - org.apache.iceberg.TableMetadata oldTableMetadata; - if (lastMetadataLogEntryFile == null || !lastMetadataLogEntryFile.equals(previousEntry.file())) { - oldTableMetadata = TableMetadataParser.read(table.io(), previousEntry.file()); - } else { - oldTableMetadata = lastTableMetadata; - } - - org.apache.iceberg.TableMetadata newTableMetadata = ((HasTableOperations) table).operations().current(); - DDLInfo.Generator generator = new DDLInfo.Generator(); - result.addAll(generator.tableIdentify(arcticTable.id()) - .oldMeta(oldTableMetadata) - .newMeta(newTableMetadata) - .generate()); - } - return result; + IcebergTableMetaExtract extract = new IcebergTableMetaExtract(); + DDLReverser ddlReverser = new DDLReverser<>(extract); + return ddlReverser.reverse(table, amoroTable.id()); } @Override diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/PaimonTableDescriptor.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/PaimonTableDescriptor.java index 645f2a4924..b51ff73a84 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/dashboard/PaimonTableDescriptor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/PaimonTableDescriptor.java @@ -20,12 +20,15 @@ import com.netease.arctic.AmoroTable; import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.server.dashboard.component.reverser.DDLReverser; +import com.netease.arctic.server.dashboard.component.reverser.PaimonTableMetaExtract; import com.netease.arctic.server.dashboard.model.DDLInfo; import com.netease.arctic.server.dashboard.model.PartitionBaseInfo; import com.netease.arctic.server.dashboard.model.PartitionFileBaseInfo; import com.netease.arctic.server.dashboard.model.ServerTableMeta; import com.netease.arctic.server.dashboard.model.TransactionsOfTable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.paimon.table.DataTable; import java.util.List; @@ -55,7 +58,10 @@ public List getTransactionDetail(AmoroTable amoroTable @Override public List getTableOperations(AmoroTable amoroTable) { - throw new UnsupportedOperationException(); + DataTable table = getTable(amoroTable); + PaimonTableMetaExtract extract = new PaimonTableMetaExtract(); + DDLReverser ddlReverser = new DDLReverser<>(extract); + return ddlReverser.reverse(table, amoroTable.id()); } @Override @@ -67,4 +73,8 @@ public List getTablePartition(AmoroTable amoroTable) { public List getTableFile(AmoroTable amoroTable, String partition) { throw new UnsupportedOperationException(); } + + private DataTable getTable(AmoroTable amoroTable) { + return (DataTable) amoroTable.originalTable(); + } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/DDLReverser.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/DDLReverser.java new file mode 100644 index 0000000000..e9f55e9897 --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/DDLReverser.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard.component.reverser; + +import com.google.common.collect.Maps; +import com.netease.arctic.server.dashboard.model.DDLInfo; +import com.netease.arctic.table.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class DDLReverser { + + private final TableMetaExtract tableMetaExtract; + + public DDLReverser(TableMetaExtract tableMetaExtract) { + this.tableMetaExtract = tableMetaExtract; + } + + public List reverse(T table, TableIdentifier tableIdentifier) { + + //Currently only spark metadata change + SparkMetadataChangeHandler metadataChangeHandler = + new SparkMetadataChangeHandler(tableIdentifier.getTableName()); + + List internalTableMetas = tableMetaExtract.extractTable(table); + if (internalTableMetas.isEmpty() || internalTableMetas.size() == 1) { + return Collections.emptyList(); + } + + List result = new ArrayList<>(); + + for (int i = 1; i < internalTableMetas.size(); i++) { + TableMetaExtract.InternalTableMeta pre = internalTableMetas.get(i - 1); + TableMetaExtract.InternalTableMeta current = internalTableMetas.get(i); + + compareProperties(pre.getProperties(), current.getProperties(), metadataChangeHandler) + .forEach(sql -> result.add( + DDLInfo.of(tableIdentifier, sql, DDLInfo.DDLType.UPDATE_PROPERTIES, current.getTime())) + ); + + compareSchemas(pre.getInternalSchema(), current.getInternalSchema(), metadataChangeHandler) + .forEach(sql -> result.add( + DDLInfo.of(tableIdentifier, sql, DDLInfo.DDLType.UPDATE_SCHEMA, current.getTime())) + ); + } + return result; + } + + private List compareProperties( + Map pre, + Map current, + MetadataChangeHandler metadataChangeHandler) { + // Although only one SQL statement can be executed at a time, + // using the Java API to make modifications can result in the effect of multiple SQL statements. + List result = new ArrayList<>(); + + // Remove properties + Set removeProperties = Sets.difference(pre.keySet(), current.keySet()); + if (!removeProperties.isEmpty()) { + result.add(metadataChangeHandler.removeProperties(removeProperties)); + } + + // Change and add properties + Map changeAndAddProperties = Maps.newHashMap(); + for (Map.Entry currentEntry : current.entrySet()) { + String key = currentEntry.getKey(); + String value = currentEntry.getValue(); + if (pre.containsKey(key)) { + if (!pre.get(key).equals(value)) { + changeAndAddProperties.put(key, value); + } + } else { + changeAndAddProperties.put(key, value); + } + } + if (!changeAndAddProperties.isEmpty()) { + result.add(metadataChangeHandler.changeAndAddProperties(changeAndAddProperties)); + } + + return result; + } + + private List compareSchemas(List pre, + List current, MetadataChangeHandler metadataChangeHandler) { + // Although only one SQL statement can be executed at a time, + // using the Java API to make modifications can result in the effect of multiple SQL statements. + List result = new ArrayList<>(); + + Map preMap = pre.stream().collect( + Collectors.toMap(TableMetaExtract.InternalSchema::getId, v -> v)); + Map currentMap = current.stream().collect(Collectors.toMap( + TableMetaExtract.InternalSchema::getId, v -> v)); + + //Remove columns. + Set removeColumns = Sets.difference(preMap.keySet(), currentMap.keySet()); + Set normalizedRemoveColumns = removeColumns.stream() + .filter(s -> !removeColumns.contains(preMap.get(s).getParentId())) + .collect(Collectors.toSet()); + if (!normalizedRemoveColumns.isEmpty()) { + result.add( + metadataChangeHandler + .dropColumns( + normalizedRemoveColumns + .stream() + .map(preMap::get) + .map(TableMetaExtract.InternalSchema::getName) + .collect(Collectors.toSet()) + ) + ); + } + + //Add new columns. + Set newColumns = Sets.difference(currentMap.keySet(), preMap.keySet()); + Set normalizedNewColumns = newColumns.stream() + .filter(s -> !newColumns.contains(currentMap.get(s).getParentId())) + .collect(Collectors.toSet()); + if (!normalizedNewColumns.isEmpty()) { + result.add( + metadataChangeHandler + .addNewColumns( + normalizedNewColumns + .stream() + .map(currentMap::get) + .collect(Collectors.toList()) + ) + ); + } + + //Change columns. + for (TableMetaExtract.InternalSchema currentSchema : current) { + if (!preMap.containsKey(currentSchema.getId())) { + continue; + } + + TableMetaExtract.InternalSchema preSchema = preMap.get(currentSchema.getId()); + if (!Objects.equals(preSchema.getName(), currentSchema.getName())) { + result.add(metadataChangeHandler.renameColumnName(preSchema.getName(), currentSchema.getName())); + } + if (!Objects.equals(preSchema.getType(), currentSchema.getType())) { + result.add(metadataChangeHandler.changeColumnType(currentSchema.getName(), currentSchema.getType())); + } + if (!Objects.equals(preSchema.getComment(), currentSchema.getComment())) { + result.add(metadataChangeHandler.changeColumnsComment(currentSchema.getName(), currentSchema.getComment())); + } + if (preSchema.isRequired() != currentSchema.isRequired()) { + result.add(metadataChangeHandler.changeColumnsRequire(currentSchema.getName(), currentSchema.isRequired())); + } + } + + return result; + } +} diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/IcebergTableMetaExtract.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/IcebergTableMetaExtract.java new file mode 100644 index 0000000000..c014356771 --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/IcebergTableMetaExtract.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard.component.reverser; + +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * TableMetaExtract for iceberg table. + */ +public class IcebergTableMetaExtract implements TableMetaExtract
{ + + @Override + public List extractTable(Table table) { + List metadataLogEntries = + ((HasTableOperations) table).operations().current().previousFiles(); + + List metadataList = metadataLogEntries.stream() + .map(entry -> TableMetadataParser.read(table.io(), entry.file())) + .collect(Collectors.toList()); + + metadataList.add(((HasTableOperations) table).operations().current()); + + return metadataList.stream() + .map(metadata -> new InternalTableMeta( + metadata.lastUpdatedMillis(), + transform(metadata.schema()), + metadata.properties())) + .collect(Collectors.toList()); + } + + private List transform(Schema schema) { + return transform(null, new ArrayList<>(), schema.asStruct()); + } + + private List transform(Integer parent, List parentName, Type type) { + List result = new ArrayList<>(); + if (type.isStructType()) { + for (Types.NestedField field : type.asStructType().fields()) { + List name = Lists.newArrayList(parentName); + name.add(field.name()); + Type fieldType = field.type(); + result.add( + new InternalSchema( + field.fieldId(), + parent, + formatName(name), + dateTypeToSparkString(fieldType), + field.doc(), + field.isRequired() + )); + result.addAll(transform(field.fieldId(), name, field.type())); + } + } else if (type.isListType()) { + List name = Lists.newArrayList(parentName); + name.add("element"); + Type elementType = type.asListType().elementType(); + result.add( + new InternalSchema( + type.asListType().elementId(), + parent, + formatName(name), + dateTypeToSparkString(elementType), + "", + false + ) + ); + result.addAll(transform(type.asListType().elementId(), name, type.asListType().elementType())); + } else if (type.isMapType()) { + List keyName = Lists.newArrayList(parentName); + keyName.add("key"); + Type keyType = type.asMapType().keyType(); + int keyId = type.asMapType().keyId(); + result.add(new InternalSchema( + keyId, + parent, + formatName(keyName), + dateTypeToSparkString(keyType), + "", + false + )); + result.addAll(transform(keyId, keyName, keyType)); + + List valueName = Lists.newArrayList(parentName); + valueName.add("value"); + Type valueType = type.asMapType().valueType(); + int valueId = type.asMapType().valueId(); + result.add(new InternalSchema( + valueId, + parent, + formatName(valueName), + dateTypeToSparkString(valueType), + "", + false + )); + result.addAll(transform(valueId, valueName, valueType)); + } + return result; + } + + private String formatName(List names) { + return String.join(".", names); + } + + private String dateTypeToSparkString(Type type) { + return TypeUtil.visit(type, new IcebergTypeToSparkType()).catalogString(); + } +} diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/IcebergTypeToSparkType.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/IcebergTypeToSparkType.java new file mode 100644 index 0000000000..740af9b95d --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/IcebergTypeToSparkType.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netease.arctic.server.dashboard.component.reverser; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.types.ArrayType$; +import org.apache.spark.sql.types.BinaryType$; +import org.apache.spark.sql.types.BooleanType$; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType$; +import org.apache.spark.sql.types.DecimalType$; +import org.apache.spark.sql.types.DoubleType$; +import org.apache.spark.sql.types.FloatType$; +import org.apache.spark.sql.types.IntegerType$; +import org.apache.spark.sql.types.LongType$; +import org.apache.spark.sql.types.MapType$; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType$; +import org.apache.spark.sql.types.TimestampType$; + +import java.util.List; + +/** + * Copy from org.apache.iceberg.spark.TypeToSparkType. + * Reason: + * 1. org.apache.iceberg.spark.TypeToSparkType is package private. + * 2. iceberg-spark dependency is runtime scope, so we can't use it in compile scope. + */ +public class IcebergTypeToSparkType extends TypeUtil.SchemaVisitor { + public IcebergTypeToSparkType() { + } + + @Override + public DataType schema(Schema schema, DataType structType) { + return structType; + } + + @Override + public DataType struct(Types.StructType struct, List fieldResults) { + List fields = struct.fields(); + + List sparkFields = Lists.newArrayListWithExpectedSize(fieldResults.size()); + for (int i = 0; i < fields.size(); i += 1) { + Types.NestedField field = fields.get(i); + DataType type = fieldResults.get(i); + StructField sparkField = + StructField.apply(field.name(), type, field.isOptional(), Metadata.empty()); + if (field.doc() != null) { + sparkField = sparkField.withComment(field.doc()); + } + sparkFields.add(sparkField); + } + + return StructType$.MODULE$.apply(sparkFields); + } + + @Override + public DataType field(Types.NestedField field, DataType fieldResult) { + return fieldResult; + } + + @Override + public DataType list(Types.ListType list, DataType elementResult) { + return ArrayType$.MODULE$.apply(elementResult, list.isElementOptional()); + } + + @Override + public DataType map(Types.MapType map, DataType keyResult, DataType valueResult) { + return MapType$.MODULE$.apply(keyResult, valueResult, map.isValueOptional()); + } + + @Override + public DataType primitive(Type.PrimitiveType primitive) { + switch (primitive.typeId()) { + case BOOLEAN: + return BooleanType$.MODULE$; + case INTEGER: + return IntegerType$.MODULE$; + case LONG: + return LongType$.MODULE$; + case FLOAT: + return FloatType$.MODULE$; + case DOUBLE: + return DoubleType$.MODULE$; + case DATE: + return DateType$.MODULE$; + case TIME: + throw new UnsupportedOperationException("Spark does not support time fields"); + case TIMESTAMP: + return TimestampType$.MODULE$; + case STRING: + return StringType$.MODULE$; + case UUID: + // use String + return StringType$.MODULE$; + case FIXED: + return BinaryType$.MODULE$; + case BINARY: + return BinaryType$.MODULE$; + case DECIMAL: + Types.DecimalType decimal = (Types.DecimalType) primitive; + return DecimalType$.MODULE$.apply(decimal.precision(), decimal.scale()); + default: + throw new UnsupportedOperationException( + "Cannot convert unknown type to Spark: " + primitive); + } + } +} diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/MetadataChangeHandler.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/MetadataChangeHandler.java new file mode 100644 index 0000000000..66191b5267 --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/MetadataChangeHandler.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard.component.reverser; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * An interface for reverse engineering the original DDL statement based on changes to the metadata. + */ +interface MetadataChangeHandler { + + /** + * Change and add properties. + * @param diffProperties the properties to be changed and added + * @return DDL statement + */ + String changeAndAddProperties(Map diffProperties); + + /** + * Remove properties. + * @param removeKeys the properties to be removed + * @return DDL statement + */ + String removeProperties(Set removeKeys); + + /** + * Add new columns. + * @param newSchemas the new columns to be added + * @return DDL statement + */ + String addNewColumns(List newSchemas); + + /** + * Rename column. + * @param oldName the old column name + * @param newName the new column name + * @return DDL statement + */ + String renameColumnName(String oldName, String newName); + + /** + * Drop columns. + * @param dropColumns the columns to be dropped + * @return DDL statement + */ + String dropColumns(Set dropColumns); + + /** + * Change columns' require. + * @param columnName the column name + * @param required true if the column is required, false otherwise + * @return DDL statement + */ + String changeColumnsRequire(String columnName, boolean required); + + /** + * Change columns' comment. + * @param columnName the column name + * @param comment the comment of column + * @return DDL statement + */ + String changeColumnsComment(String columnName, String comment); + + /** + * Change columns' type. + * @param columnName the column name + * @param newType the new type of column + * @return DDL statement + */ + String changeColumnType(String columnName, String newType); +} diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/PaimonTableMetaExtract.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/PaimonTableMetaExtract.java new file mode 100644 index 0000000000..ac5f19431a --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/PaimonTableMetaExtract.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard.component.reverser; + +import com.google.common.collect.Lists; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.DataTable; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * TableMetaExtract for paomon table. + */ +public class PaimonTableMetaExtract implements TableMetaExtract { + + @Override + public List extractTable(DataTable table) { + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + List tableSchemas = schemaManager.listAll(); + return tableSchemas.stream() + .sorted((o1, o2) -> Long.compare(o1.timeMillis(), o2.timeMillis())) + .map(schema -> new InternalTableMeta( + schema.timeMillis(), + transform(schema.fields()), + schema.options())) + .collect(Collectors.toList()); + } + + private List transform(List fields) { + return transform(null, new ArrayList<>(), new RowType(fields)); + } + + private List transform(Integer parent, List parentName, DataType type) { + List result = new ArrayList<>(); + if (type instanceof RowType) { + RowType rowType = (RowType) type; + for (DataField field : rowType.getFields()) { + List name = Lists.newArrayList(parentName); + name.add(field.name()); + DataType fieldType = field.type(); + result.add( + new InternalSchema( + field.id(), + parent, + formatName(name), + dateTypeToSparkString(fieldType), + field.description(), + !field.type().isNullable() + )); + result.addAll(transform(field.id(), name, field.type())); + } + } else if (type instanceof ArrayType) { + ArrayType arrayType = (ArrayType) type; + List name = Lists.newArrayList(parentName); + name.add("element"); + DataType elementType = arrayType.getElementType(); + result.addAll(transform(parent, name, elementType)); + } else if (type instanceof MapType) { + MapType mapType = (MapType) type; + List keyName = Lists.newArrayList(parentName); + keyName.add("key"); + DataType keyType = mapType.getKeyType(); + result.addAll(transform(parent, keyName, keyType)); + + List valueName = Lists.newArrayList(parentName); + valueName.add("value"); + DataType valueType = mapType.getValueType(); + result.addAll(transform(parent, valueName, valueType)); + } + return result; + } + + private String formatName(List names) { + return String.join(".", names); + } + + private String dateTypeToSparkString(DataType dataType) { + return PaimonTypeToSparkType.fromPaimonType(dataType).catalogString(); + } +} \ No newline at end of file diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/PaimonTypeToSparkType.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/PaimonTypeToSparkType.java new file mode 100644 index 0000000000..cd0d0e69df --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/PaimonTypeToSparkType.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard.component.reverser; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeDefaultVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.UserDefinedType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Copy from org.apache.paimon.spark.SparkTypeUtilsorg.apache.paimon.spark.SparkTypeUtils. + * Reason: + * 1. Ams server don't have paimon-spark dependency. + */ +public class PaimonTypeToSparkType { + + private PaimonTypeToSparkType() { + } + + public static StructType fromPaimonRowType(RowType type) { + return (StructType) fromPaimonType(type); + } + + public static DataType fromPaimonType(org.apache.paimon.types.DataType type) { + return type.accept(PaimonToSparkTypeVisitor.INSTANCE); + } + + public static org.apache.paimon.types.DataType toPaimonType(DataType dataType) { + return SparkToPaimonTypeVisitor.visit(dataType); + } + + private static class PaimonToSparkTypeVisitor extends DataTypeDefaultVisitor { + + private static final PaimonToSparkTypeVisitor INSTANCE = new PaimonToSparkTypeVisitor(); + + @Override + public DataType visit(CharType charType) { + return DataTypes.StringType; + } + + @Override + public DataType visit(VarCharType varCharType) { + return DataTypes.StringType; + } + + @Override + public DataType visit(BooleanType booleanType) { + return DataTypes.BooleanType; + } + + @Override + public DataType visit(BinaryType binaryType) { + return DataTypes.BinaryType; + } + + @Override + public DataType visit(VarBinaryType varBinaryType) { + return DataTypes.BinaryType; + } + + @Override + public DataType visit(DecimalType decimalType) { + return DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()); + } + + @Override + public DataType visit(TinyIntType tinyIntType) { + return DataTypes.ByteType; + } + + @Override + public DataType visit(SmallIntType smallIntType) { + return DataTypes.ShortType; + } + + @Override + public DataType visit(IntType intType) { + return DataTypes.IntegerType; + } + + @Override + public DataType visit(BigIntType bigIntType) { + return DataTypes.LongType; + } + + @Override + public DataType visit(FloatType floatType) { + return DataTypes.FloatType; + } + + @Override + public DataType visit(DoubleType doubleType) { + return DataTypes.DoubleType; + } + + @Override + public DataType visit(DateType dateType) { + return DataTypes.DateType; + } + + @Override + public DataType visit(TimestampType timestampType) { + return DataTypes.TimestampType; + } + + @Override + public DataType visit(LocalZonedTimestampType localZonedTimestampType) { + return DataTypes.TimestampType; + } + + @Override + public DataType visit(ArrayType arrayType) { + org.apache.paimon.types.DataType elementType = arrayType.getElementType(); + return DataTypes.createArrayType(elementType.accept(this), elementType.isNullable()); + } + + @Override + public DataType visit(MultisetType multisetType) { + return DataTypes.createMapType( + multisetType.getElementType().accept(this), DataTypes.IntegerType, false); + } + + @Override + public DataType visit(MapType mapType) { + return DataTypes.createMapType( + mapType.getKeyType().accept(this), + mapType.getValueType().accept(this), + mapType.getValueType().isNullable()); + } + + /** + * For simplicity, as a temporary solution, we directly convert the non-null attribute to + * nullable on the Spark side. + */ + @Override + public DataType visit(RowType rowType) { + List fields = new ArrayList<>(rowType.getFieldCount()); + for (DataField field : rowType.getFields()) { + StructField structField = + DataTypes.createStructField(field.name(), field.type().accept(this), true); + structField = + Optional.ofNullable(field.description()) + .map(structField::withComment) + .orElse(structField); + fields.add(structField); + } + return DataTypes.createStructType(fields); + } + + @Override + protected DataType defaultMethod(org.apache.paimon.types.DataType dataType) { + throw new UnsupportedOperationException("Unsupported type: " + dataType); + } + } + + private static class SparkToPaimonTypeVisitor { + + static org.apache.paimon.types.DataType visit(DataType type) { + return visit(type, new SparkToPaimonTypeVisitor()); + } + + static org.apache.paimon.types.DataType visit( + DataType type, SparkToPaimonTypeVisitor visitor) { + if (type instanceof StructType) { + StructField[] fields = ((StructType) type).fields(); + List fieldResults = + new ArrayList<>(fields.length); + + for (StructField field : fields) { + fieldResults.add(visit(field.dataType(), visitor)); + } + + return visitor.struct((StructType) type, fieldResults); + } else if (type instanceof org.apache.spark.sql.types.MapType) { + return visitor.map( + (org.apache.spark.sql.types.MapType) type, + visit(((org.apache.spark.sql.types.MapType) type).keyType(), visitor), + visit(((org.apache.spark.sql.types.MapType) type).valueType(), visitor)); + } else if (type instanceof org.apache.spark.sql.types.ArrayType) { + return visitor.array( + (org.apache.spark.sql.types.ArrayType) type, + visit( + ((org.apache.spark.sql.types.ArrayType) type).elementType(), + visitor)); + } else if (type instanceof UserDefinedType) { + throw new UnsupportedOperationException("User-defined types are not supported"); + } else { + return visitor.atomic(type); + } + } + + public org.apache.paimon.types.DataType struct( + StructType struct, List fieldResults) { + StructField[] fields = struct.fields(); + List newFields = new ArrayList<>(fields.length); + for (int i = 0; i < fields.length; i += 1) { + StructField field = fields[i]; + org.apache.paimon.types.DataType fieldType = + fieldResults.get(i).copy(field.nullable()); + String comment = field.getComment().getOrElse(() -> null); + newFields.add(new DataField(i, field.name(), fieldType, comment)); + } + + return new RowType(newFields); + } + + public org.apache.paimon.types.DataType array( + org.apache.spark.sql.types.ArrayType array, + org.apache.paimon.types.DataType elementResult) { + return new ArrayType(elementResult.copy(array.containsNull())); + } + + public org.apache.paimon.types.DataType map( + org.apache.spark.sql.types.MapType map, + org.apache.paimon.types.DataType keyResult, + org.apache.paimon.types.DataType valueResult) { + return new MapType(keyResult.copy(false), valueResult.copy(map.valueContainsNull())); + } + + public org.apache.paimon.types.DataType atomic(DataType atomic) { + if (atomic instanceof org.apache.spark.sql.types.BooleanType) { + return new BooleanType(); + } else if (atomic instanceof org.apache.spark.sql.types.ByteType) { + return new TinyIntType(); + } else if (atomic instanceof org.apache.spark.sql.types.ShortType) { + return new SmallIntType(); + } else if (atomic instanceof org.apache.spark.sql.types.IntegerType) { + return new IntType(); + } else if (atomic instanceof LongType) { + return new BigIntType(); + } else if (atomic instanceof org.apache.spark.sql.types.FloatType) { + return new FloatType(); + } else if (atomic instanceof org.apache.spark.sql.types.DoubleType) { + return new DoubleType(); + } else if (atomic instanceof org.apache.spark.sql.types.StringType) { + return new VarCharType(VarCharType.MAX_LENGTH); + } else if (atomic instanceof org.apache.spark.sql.types.VarcharType) { + return new VarCharType(((org.apache.spark.sql.types.VarcharType) atomic).length()); + } else if (atomic instanceof org.apache.spark.sql.types.CharType) { + return new CharType(((org.apache.spark.sql.types.CharType) atomic).length()); + } else if (atomic instanceof org.apache.spark.sql.types.DateType) { + return new DateType(); + } else if (atomic instanceof org.apache.spark.sql.types.TimestampType) { + return new TimestampType(); + } else if (atomic instanceof org.apache.spark.sql.types.DecimalType) { + return new DecimalType( + ((org.apache.spark.sql.types.DecimalType) atomic).precision(), + ((org.apache.spark.sql.types.DecimalType) atomic).scale()); + } else if (atomic instanceof org.apache.spark.sql.types.BinaryType) { + return new VarBinaryType(VarBinaryType.MAX_LENGTH); + } + + throw new UnsupportedOperationException( + "Not a supported type: " + atomic.catalogString()); + } + } +} diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/SparkMetadataChangeHandler.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/SparkMetadataChangeHandler.java new file mode 100644 index 0000000000..be69c9c50a --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/SparkMetadataChangeHandler.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard.component.reverser; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Translate to Spark's DDL syntax. + */ +public class SparkMetadataChangeHandler implements MetadataChangeHandler { + private static final String ALTER_TABLE = "ALTER TABLE %s"; + private static final String ADD_COLUMN = " ADD COLUMNS %s"; + private static final String ALTER_COLUMN = " ALTER COLUMN %s"; + private static final String RENAME_COLUMN = " RENAME COLUMN %s TO %s"; + private static final String DROP_COLUMNS = " DROP COLUMN %s"; + private static final String SET_PROPERTIES = " SET TBLPROPERTIES (%s)"; + private static final String UNSET_PROPERTIES = " UNSET TBLPROPERTIES (%s)"; + private static final String IS_OPTIONAL = " DROP NOT NULL"; + private static final String NOT_OPTIONAL = " SET NOT NULL"; + private static final String DOC = " COMMENT '%s'"; + private static final String TYPE = " TYPE %s"; + + private String tableName; + + public SparkMetadataChangeHandler(String tableName) { + this.tableName = tableName; + } + + @Override + public String changeAndAddProperties(Map diffProperties) { + String template = ALTER_TABLE + SET_PROPERTIES; + String properties = diffProperties + .entrySet() + .stream() + .map(entry -> String.format("'%s' = '%s'", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",")); + return String.format(template, tableName, properties); + } + + @Override + public String removeProperties(Set removeKeys) { + String template = ALTER_TABLE + UNSET_PROPERTIES; + String properties = removeKeys + .stream() + .map(key -> String.format("'%s'", key)) + .collect(Collectors.joining(",")); + return String.format(template, tableName, properties); + } + + @Override + public String addNewColumns(List newSchemas) { + String template = ALTER_TABLE + ADD_COLUMN; + return String.format( + template, + tableName, + newSchemas.stream() + .map(TableMetaExtract.InternalSchema::columnString) + .collect(Collectors.joining(",", "(", ")"))); + } + + @Override + public String renameColumnName(String oldName, String newName) { + String template = ALTER_TABLE + RENAME_COLUMN; + return String.format(template, tableName, oldName, newName); + } + + @Override + public String dropColumns(Set dropColumns) { + String template = ALTER_TABLE + DROP_COLUMNS; + return String.format(template, tableName, String.join(",", dropColumns)); + } + + @Override + public String changeColumnsRequire(String columnName, boolean required) { + String template = ALTER_TABLE + ALTER_COLUMN + (required ? IS_OPTIONAL : NOT_OPTIONAL); + return String.format(template, tableName, columnName); + } + + @Override + public String changeColumnsComment(String columnName, String comment) { + String template = ALTER_TABLE + ALTER_COLUMN + DOC; + return String.format(template, tableName, columnName, comment); + } + + @Override + public String changeColumnType(String columnName, String newType) { + String template = ALTER_TABLE + ALTER_COLUMN + TYPE; + return String.format(template, tableName, columnName, newType); + } +} diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/TableMetaExtract.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/TableMetaExtract.java new file mode 100644 index 0000000000..b469f7722d --- /dev/null +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/component/reverser/TableMetaExtract.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard.component.reverser; + +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + + +/** + * An interface for returning the historical metadata of a table. + * The table can be iceberg,mixed-iceberg,mixed-hive,paimon and so on. + * + * @param Table type + */ +interface TableMetaExtract { + + /** + * Extract the historical metadata of a table. + */ + List extractTable(T table); + + class InternalTableMeta { + + private final long time; + + private final List internalSchemas; + + private final Map properties; + + public InternalTableMeta(long time, List internalSchemas, Map properties) { + this.time = time; + this.internalSchemas = internalSchemas; + this.properties = properties; + } + + public long getTime() { + return time; + } + + public List getInternalSchema() { + return internalSchemas; + } + + public Map getProperties() { + return properties; + } + } + + class InternalSchema { + private final int id; + + private final Integer parentId; + + private final String name; + + private final String type; + + private final String comment; + + private final boolean required; + + public InternalSchema( + int id, + @Nullable Integer parentId, + String name, + String type, + String comment, + boolean required) { + this.id = id; + this.parentId = parentId; + this.name = name; + this.type = type; + this.comment = comment; + this.required = required; + } + + public int getId() { + return id; + } + + public Integer getParentId() { + return parentId; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public String getComment() { + return comment; + } + + public boolean isRequired() { + return required; + } + + public String columnString() { + return String.format("%s %s", name, type); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", id) + .add("parentId", parentId) + .add("name", name) + .add("type", type) + .add("comment", comment) + .add("required", required) + .toString(); + } + } +} diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/controller/TableController.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/controller/TableController.java index a3fa432180..1e2a932b14 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/dashboard/controller/TableController.java +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/controller/TableController.java @@ -363,7 +363,7 @@ public void getPartitionFileListInfo(Context ctx) { * * @param ctx - context for handling the request and response */ - public void getTableOperations(Context ctx) { + public void getTableOperations(Context ctx) throws Exception { String catalogName = ctx.pathParam("catalog"); String db = ctx.pathParam("db"); String tableName = ctx.pathParam("table"); diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/model/DDLInfo.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/model/DDLInfo.java index 9bbe0e98a2..7c5ff4cf99 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/dashboard/model/DDLInfo.java +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/model/DDLInfo.java @@ -19,28 +19,17 @@ package com.netease.arctic.server.dashboard.model; import com.netease.arctic.table.TableIdentifier; -import com.netease.arctic.table.TableProperties; -import org.apache.iceberg.Schema; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.types.Types; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; public class DDLInfo { private TableIdentifier tableIdentifier; private String ddl; - private String ddlType; + private DDLType ddlType; private Long commitTime; public static DDLInfo of( TableIdentifier tableIdentifier, String ddl, - String ddlType, + DDLType ddlType, Long commitTime) { DDLInfo ddlInfo = new DDLInfo(); ddlInfo.setTableIdentifier(tableIdentifier); @@ -66,11 +55,11 @@ public void setDdl(String ddl) { this.ddl = ddl; } - public String getDdlType() { + public DDLType getDdlType() { return ddlType; } - public void setDdlType(String ddlType) { + public void setDdlType(DDLType ddlType) { this.ddlType = ddlType; } @@ -82,246 +71,8 @@ public void setCommitTime(Long commitTime) { this.commitTime = commitTime; } - public static class Generator { - private static final String DOT = "."; - private static final String ALTER_TABLE = "ALTER TABLE %s"; - private static final String ADD_COLUMN = " ADD COLUMN "; - private static final String ALTER_COLUMN = " ALTER COLUMN %s"; - private static final String MOVE_FIRST = " ALTER COLUMN %s FIRST"; - private static final String MOVE_AFTER_COLUMN = " ALTER COLUMN %s AFTER %s"; - private static final String RENAME_COLUMN = " RENAME COLUMN %s TO %s"; - private static final String DROP_COLUMNS = " DROP COLUMN %s"; - private static final String SET_PROPERTIES = " SET TBLPROPERTIES (%s)"; - private static final String UNSET_PROPERTIES = " UNSET TBLPROPERTIES (%s)"; - private static final String IS_OPTIONAL = " DROP NOT NULL"; - private static final String NOT_OPTIONAL = " SET NOT NULL"; - private static final String DOC = " COMMENT '%s'"; - private static final String TYPE = " TYPE %s"; - private TableMetadata oldMeta; - private TableMetadata newMeta; - private TableIdentifier tableIdentifier; - - public Generator oldMeta(TableMetadata oldMeta) { - this.oldMeta = oldMeta; - return this; - } - - public Generator newMeta(TableMetadata newMeta) { - this.newMeta = newMeta; - return this; - } - - public Generator tableIdentify(TableIdentifier tableIdentifier) { - this.tableIdentifier = tableIdentifier; - return this; - } - - public List generate() { - if (newMeta == null || tableIdentifier == null) { - throw new RuntimeException("tableIdentifier and newMeta must not be null"); - } - List result = new ArrayList<>(); - if (oldMeta == null) { - return result; - } - genNativeIcebergDDL(tableIdentifier, oldMeta, newMeta, result); - return result; - } - - private void genNativeIcebergDDL( - TableIdentifier identifier, org.apache.iceberg.TableMetadata oldTableMetadata, - org.apache.iceberg.TableMetadata newTableMetadata, List result) { - if (oldTableMetadata.currentSchemaId() == newTableMetadata.currentSchemaId()) { - String ddl = - compareProperties(identifier.toString(), oldTableMetadata.properties(), newTableMetadata.properties()); - if (!ddl.isEmpty()) { - result.add(DDLInfo.of(identifier, ddl, DDLType.UPDATE_PROPERTIES.name(), - newTableMetadata.lastUpdatedMillis())); - } - } else { - String ddl = compareSchema( - identifier.toString(), - oldTableMetadata.schema(), - newTableMetadata.schema()); - if (!ddl.isEmpty()) { - result.add(DDLInfo.of(identifier, ddl, DDLType.UPDATE_SCHEMA.name(), - newTableMetadata.lastUpdatedMillis())); - } - } - } - - private String compareProperties(String tableName, Map before, Map after) { - StringBuilder setSql = new StringBuilder(); - StringBuilder unsetSql = new StringBuilder(); - StringBuilder unsetPro = new StringBuilder(); - int c = 0; - for (String oldPro : before.keySet()) { - if (TableProperties.WRITE_PROTECTED_PROPERTIES.contains(oldPro)) { - continue; - } - if (!after.containsKey(oldPro)) { - if (c > 0) { - unsetPro.append(",").append("\n"); - } - unsetPro.append(String.format("'%s'", oldPro)); - c++; - } - } - StringBuilder setPro = new StringBuilder(); - int c1 = 0; - for (String newPro : after.keySet()) { - if (TableProperties.WRITE_PROTECTED_PROPERTIES.contains(newPro)) { - continue; - } - if (!after.get(newPro).equals(before.get(newPro))) { - if (c1 > 0) { - setPro.append(",").append("\n"); - } - setPro.append(String.format("'%s'='%s'", newPro, after.get(newPro))); - c1++; - } - } - if (setPro.length() > 0) { - setSql.append(String.format(ALTER_TABLE, tableName)).append(String.format(SET_PROPERTIES, setPro)); - } - if (unsetPro.length() > 0) { - unsetSql.append(String.format(ALTER_TABLE, tableName)).append(String.format(UNSET_PROPERTIES, unsetPro)); - } - return setSql.append(unsetSql).toString(); - } - - private String compareSchema(String tableName, Schema before, Schema after) { - StringBuilder rs = new StringBuilder(); - if (before == null) { - return ""; - } - - LinkedList sortedBefore = - before.columns().stream().map(Types.NestedField::name).collect(Collectors.toCollection(LinkedList::new)); - for (int i = 0; i < before.columns().size(); i++) { - Types.NestedField field = before.columns().get(i); - StringBuilder sb = new StringBuilder(); - if (after.findField(field.fieldId()) == null) { - // drop col - sb.append(String.format(ALTER_TABLE, tableName)); - sb.append(String.format(DROP_COLUMNS, field.name())); - sortedBefore.remove(field.name()); - } - if (sb.length() > 0) { - rs.append(sb).append(";"); - if (i < before.columns().size() - 1) { - rs.append("\n"); - } - } - } - - int maxIndex = 0; - for (int i = 0; i < before.columns().size(); i++) { - int index = after.columns().indexOf(before.columns().get(i)); - maxIndex = Math.max(index, maxIndex); - } - for (int i = 0; i < after.columns().size(); i++) { - Types.NestedField field = after.columns().get(i); - StringBuilder sb = new StringBuilder(); - if (before.findField(field.fieldId()) == null) { - // add col - sb.append(String.format(ALTER_TABLE, tableName)); - sb.append(ADD_COLUMN); - sb.append(field.name()).append(" "); - sb.append(field.type().toString()).append(" "); - if (field.doc() != null) { - sb.append(String.format(DOC, field.doc())); - } - sortedBefore.add(i, field.name()); - if (i == 0) { - sb.append(" FIRST"); - sortedBefore.removeLast(); - sortedBefore.addFirst(field.name()); - } else if (i < maxIndex) { - sb.append(" AFTER ").append(sortedBefore.get(i - 1)); - sortedBefore.removeLast(); - sortedBefore.add(i, field.name()); - } - } else if (!before.findField(field.fieldId()).equals(field)) { - sb.append(String.format(ALTER_TABLE, tableName)); - //alter col - Types.NestedField oldField = before.findField(field.fieldId()); - //rename - if (!oldField.name().equals(field.name())) { - sb.append(String.format(RENAME_COLUMN, oldField.name(), field.name())); - } else { - sb.append(String.format(ALTER_COLUMN, oldField.name())); - - if (!oldField.type().equals(field.type())) { - if ((oldField.type() instanceof Types.MapType) && (field.type() instanceof Types.MapType)) { - sb = new StringBuilder(); - sb.append(String.format(ALTER_TABLE, tableName)).append(compareFieldType(oldField, field)); - } else { - sb.append(String.format(TYPE, field.type().toString())); - } - } - - if (!Objects.equals(oldField.doc(), field.doc())) { - if (field.doc() != null) { - sb.append(String.format(DOC, field.doc())); - } - } - - if (oldField.isOptional() != field.isOptional()) { - if (field.isOptional()) { - sb.append(IS_OPTIONAL); - } else { - sb.append(NOT_OPTIONAL); - } - } - } - } else if (i != before.columns().indexOf(field) && i != sortedBefore.indexOf(field.name())) { - sb.append(String.format(ALTER_TABLE, tableName)); - if (i == 0) { - sb.append(String.format(MOVE_FIRST, field.name())); - sortedBefore.remove(field.name()); - sortedBefore.addFirst(field.name()); - } else { - sb.append(String.format(MOVE_AFTER_COLUMN, field.name(), after.columns().get(i - 1).name())); - sortedBefore.remove(field.name()); - sortedBefore.add(i, field.name()); - } - } - if (sb.length() > 0) { - rs.append(sb).append(";"); - if (i < after.columns().size() - 1) { - rs.append("\n"); - } - } - } - return rs.toString(); - } - - private StringBuilder compareFieldType(Types.NestedField oldField, Types.NestedField field) { - StringBuilder sb = new StringBuilder(); - - Types.StructType oldValueType = oldField.type().asMapType().valueType().asStructType(); - List oldValueFields = oldValueType.fields(); - Types.StructType valueType = field.type().asMapType().valueType().asStructType(); - List valueFields = valueType.fields(); - for (Types.NestedField old : oldValueFields) { - if (valueType.field(old.fieldId()) == null) { - //drop column - sb.append(String.format(DROP_COLUMNS, oldField.name() + DOT + old.name())); - } - } - for (Types.NestedField newF : valueFields) { - if (oldValueType.field(newF.fieldId()) == null) { - //add column - sb.append(ADD_COLUMN).append(field.name()).append(DOT).append(newF.name()).append(" ").append(newF.type()); - } - } - return sb; - } - - public enum DDLType { - UPDATE_SCHEMA, - UPDATE_PROPERTIES - } + public enum DDLType { + UPDATE_SCHEMA, + UPDATE_PROPERTIES } } diff --git a/ams/server/src/test/java/com/netease/arctic/server/catalog/TableCatalogTestBase.java b/ams/server/src/test/java/com/netease/arctic/server/catalog/TableCatalogTestBase.java index 37814ffd5a..01a4b39858 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/catalog/TableCatalogTestBase.java +++ b/ams/server/src/test/java/com/netease/arctic/server/catalog/TableCatalogTestBase.java @@ -42,7 +42,7 @@ public class TableCatalogTestBase extends TableServiceTestBase { private AmoroCatalog amoroCatalog; - private Object originalTableCatalog; + private Object originalCatalog; public TableCatalogTestBase(AmoroCatalogTestHelper amoroCatalogTestHelper) { this.amoroCatalogTestHelper = amoroCatalogTestHelper; @@ -55,7 +55,7 @@ public void init() throws IOException { amoroCatalogTestHelper.initHiveConf(TEST_HMS.getHiveConf()); this.amoroCatalog = amoroCatalogTestHelper.amoroCatalog(); tableService().createCatalog(amoroCatalogTestHelper.getCatalogMeta()); - this.originalTableCatalog = amoroCatalogTestHelper.originalCatalog(); + this.originalCatalog = amoroCatalogTestHelper.originalCatalog(); } @After @@ -68,8 +68,8 @@ public AmoroCatalog getAmoroCatalog() { return amoroCatalog; } - public Object getOriginalTableCatalog() { - return originalTableCatalog; + public Object getOriginalCatalog() { + return originalCatalog; } public AmoroCatalogTestHelper getAmoroCatalogTestHelper() { diff --git a/ams/server/src/test/java/com/netease/arctic/server/catalog/TestServerCatalog.java b/ams/server/src/test/java/com/netease/arctic/server/catalog/TestServerCatalog.java new file mode 100644 index 0000000000..71fca7deb4 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/catalog/TestServerCatalog.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.catalog; + +import com.netease.arctic.formats.AmoroCatalogTestHelper; +import com.netease.arctic.formats.IcebergHadoopCatalogTestHelper; +import com.netease.arctic.formats.PaimonHadoopCatalogTestHelper; +import com.netease.arctic.hive.formats.IcebergHiveCatalogTestHelper; +import com.netease.arctic.hive.formats.PaimonHiveCatalogTestHelper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestServerCatalog extends TableCatalogTestBase { + + private final String testDatabaseName = "test_database"; + + private final String testTableName = "test_table"; + + public TestServerCatalog(AmoroCatalogTestHelper amoroCatalogTestHelper) { + super(amoroCatalogTestHelper); + } + + @Parameterized.Parameters(name = "{0}") + public static Object[] parameters() { + return new Object[] { + PaimonHadoopCatalogTestHelper.defaultHelper(), + PaimonHiveCatalogTestHelper.defaultHelper(), + IcebergHadoopCatalogTestHelper.defaultHelper(), + IcebergHiveCatalogTestHelper.defaultHelper() + }; + } + + @Before + public void setUp() throws Exception { + getAmoroCatalog().createDatabase(testDatabaseName); + getAmoroCatalogTestHelper().createTable(testDatabaseName, testTableName); + } + + @Test + public void listDatabases() { + Assert.assertTrue(getExternalCatalog().listDatabases().contains(testDatabaseName)); + } + + @Test + public void dataBaseExists() { + Assert.assertTrue(getExternalCatalog().exist(testDatabaseName)); + } + + @Test + public void tableExists() { + Assert.assertTrue(getExternalCatalog().exist(testDatabaseName, testTableName)); + } + + @Test + public void listTables() { + Assert.assertEquals(1, getExternalCatalog().listTables(testDatabaseName).size()); + Assert.assertEquals(testTableName, getExternalCatalog().listTables(testDatabaseName).get(0).getTableName()); + } + + @Test + public void loadTable() { + Assert.assertNotNull(getExternalCatalog().loadTable(testDatabaseName, testTableName)); + } + + private ServerCatalog getExternalCatalog() { + return tableService().getServerCatalog(getAmoroCatalogTestHelper().catalogName()); + } +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestIcebergServerTableDescriptor.java b/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestIcebergServerTableDescriptor.java new file mode 100644 index 0000000000..2e17768041 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestIcebergServerTableDescriptor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard; + +import com.netease.arctic.formats.AmoroCatalogTestHelper; +import com.netease.arctic.formats.IcebergHadoopCatalogTestHelper; +import com.netease.arctic.hive.formats.IcebergHiveCatalogTestHelper; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Types; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestIcebergServerTableDescriptor extends TestServerTableDescriptor { + + public TestIcebergServerTableDescriptor(AmoroCatalogTestHelper amoroCatalogTestHelper) { + super(amoroCatalogTestHelper); + } + + @Parameterized.Parameters(name = "{0}") + public static Object[] parameters() { + return new Object[] { + IcebergHadoopCatalogTestHelper.defaultHelper(), + IcebergHiveCatalogTestHelper.defaultHelper() + }; + } + + @Override + protected void tableOperationsAddColumns() { + getTable().updateSchema() + .allowIncompatibleChanges() + .addColumn("new_col", Types.IntegerType.get()) + .commit(); + } + + @Override + protected void tableOperationsRenameColumns() { + getTable().updateSchema() + .renameColumn("new_col", "renamed_col") + .commit(); + } + + @Override + protected void tableOperationsChangeColumnType() { + getTable().updateSchema() + .updateColumn("renamed_col", Types.LongType.get()) + .commit(); + } + + @Override + protected void tableOperationsChangeColumnComment() { + getTable().updateSchema() + .updateColumn("renamed_col", Types.LongType.get(), "new comment") + .commit(); + } + + @Override + protected void tableOperationsChangeColumnRequired() { + getTable().updateSchema() + .allowIncompatibleChanges() + .requireColumn("renamed_col") + .commit(); + } + + @Override + protected void tableOperationsDropColumn() { + getTable().updateSchema() + .deleteColumn("renamed_col") + .commit(); + } + + private Table getTable() { + return (Table) getAmoroCatalog().loadTable(TEST_DB, TEST_TABLE).originalTable(); + } +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestPaimonServerTableDescriptor.java b/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestPaimonServerTableDescriptor.java new file mode 100644 index 0000000000..dfc1ce2fc2 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestPaimonServerTableDescriptor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.dashboard; + +import com.netease.arctic.formats.AmoroCatalogTestHelper; +import com.netease.arctic.formats.PaimonHadoopCatalogTestHelper; +import com.netease.arctic.hive.formats.PaimonHiveCatalogTestHelper; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.types.DataTypes; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestPaimonServerTableDescriptor extends TestServerTableDescriptor { + + public TestPaimonServerTableDescriptor(AmoroCatalogTestHelper amoroCatalogTestHelper) { + super(amoroCatalogTestHelper); + } + + @Parameterized.Parameters(name = "{0}") + public static Object[] parameters() { + return new Object[] { + PaimonHadoopCatalogTestHelper.defaultHelper(), + PaimonHiveCatalogTestHelper.defaultHelper() + }; + } + + @Override + protected void tableOperationsAddColumns() { + try { + getCatalog().alterTable( + Identifier.create(TEST_DB, TEST_TABLE), + SchemaChange.addColumn("new_col", DataTypes.INT()), + false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tableOperationsRenameColumns() { + try { + getCatalog().alterTable( + Identifier.create(TEST_DB, TEST_TABLE), + SchemaChange.renameColumn("new_col", "renamed_col"), + false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tableOperationsChangeColumnType() { + try { + getCatalog().alterTable( + Identifier.create(TEST_DB, TEST_TABLE), + SchemaChange.updateColumnType("renamed_col", DataTypes.BIGINT()), + false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tableOperationsChangeColumnComment() { + try { + getCatalog().alterTable( + Identifier.create(TEST_DB, TEST_TABLE), + SchemaChange.updateColumnComment("renamed_col", "new comment"), + false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tableOperationsChangeColumnRequired() { + try { + getCatalog().alterTable( + Identifier.create(TEST_DB, TEST_TABLE), + SchemaChange.updateColumnNullability("renamed_col", false), + false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tableOperationsDropColumn() { + try { + getCatalog().alterTable( + Identifier.create(TEST_DB, TEST_TABLE), + SchemaChange.dropColumn("renamed_col"), + false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Catalog getCatalog() { + return (Catalog) getOriginalCatalog(); + } +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestServerTableDescriptor.java b/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestServerTableDescriptor.java index 5eb4833fca..e239b573dc 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestServerTableDescriptor.java +++ b/ams/server/src/test/java/com/netease/arctic/server/dashboard/TestServerTableDescriptor.java @@ -18,51 +18,110 @@ package com.netease.arctic.server.dashboard; -import com.netease.arctic.BasicTableTestHelper; -import com.netease.arctic.TableTestHelper; -import com.netease.arctic.ams.api.TableFormat; -import com.netease.arctic.catalog.BasicCatalogTestHelper; -import com.netease.arctic.catalog.CatalogTestHelper; -import com.netease.arctic.hive.catalog.HiveCatalogTestHelper; -import com.netease.arctic.hive.catalog.HiveTableTestHelper; +import com.netease.arctic.formats.AmoroCatalogTestHelper; +import com.netease.arctic.server.catalog.TableCatalogTestBase; import com.netease.arctic.server.dashboard.model.DDLInfo; -import com.netease.arctic.server.table.AMSTableTestBase; -import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.server.table.ServerTableIdentifier; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.util.List; -@RunWith(Parameterized.class) -public class TestServerTableDescriptor extends AMSTableTestBase { +public abstract class TestServerTableDescriptor extends TableCatalogTestBase { - @Parameterized.Parameters(name = "{0}, {1}") - public static Object[] parameters() { - return new Object[][] {{new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), - new BasicTableTestHelper(true, true)}, - {new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()), - new HiveTableTestHelper(true, true)}}; + protected static final String TEST_DB = "test_db"; + + protected static final String TEST_TABLE = "test_table"; + + public TestServerTableDescriptor(AmoroCatalogTestHelper amoroCatalogTestHelper) { + super(amoroCatalogTestHelper); } - public TestServerTableDescriptor(CatalogTestHelper catalogTestHelper, - TableTestHelper tableTestHelper) { - super(catalogTestHelper, tableTestHelper, true); + @Before + public void before() { + getAmoroCatalog().createDatabase(TEST_DB); + try { + getAmoroCatalogTestHelper().createTable(TEST_DB, TEST_TABLE); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Test - public void getTableOperations() { + public void tableOperations() throws Exception { ServerTableDescriptor serverTableDescriptor = new ServerTableDescriptor(tableService()); - ArcticTable arcticTable = (ArcticTable) tableService().loadTable(serverTableIdentifier()).originalTable(); - arcticTable.updateProperties().set("key", "value1").commit(); - List tableOperations = serverTableDescriptor.getTableOperations(serverTableIdentifier()); - Assert.assertEquals(1, tableOperations.size()); - DDLInfo ddlInfo = tableOperations.get(0); - Assert.assertEquals(ddlInfo.getDdl(), - "ALTER TABLE test_catalog.test_db.test_table SET TBLPROPERTIES ('key'='value1')"); - arcticTable.updateProperties().set("key", "value2").commit(); - tableOperations = serverTableDescriptor.getTableOperations(serverTableIdentifier()); - Assert.assertEquals(2, tableOperations.size()); + + //add properties + getAmoroCatalogTestHelper().setTableProperties(TEST_DB, TEST_TABLE, "k1", "v1"); + + //remove properties + getAmoroCatalogTestHelper().removeTableProperties(TEST_DB, TEST_TABLE, "k1"); + + //add columns + tableOperationsAddColumns(); + + //rename columns + tableOperationsRenameColumns(); + + //change columns type + tableOperationsChangeColumnType(); + + //change columns comment + tableOperationsChangeColumnComment(); + + //change columns nullable + tableOperationsChangeColumnRequired(); + + //change columns default value + tableOperationsDropColumn(); + + List tableOperations = serverTableDescriptor.getTableOperations( + ServerTableIdentifier.of(getAmoroCatalogTestHelper().catalogName(), TEST_DB, TEST_TABLE)); + + Assert.assertEquals( + tableOperations.get(0).getDdl(), + "ALTER TABLE test_table SET TBLPROPERTIES ('k1' = 'v1')"); + + Assert.assertEquals( + tableOperations.get(1).getDdl(), + "ALTER TABLE test_table UNSET TBLPROPERTIES ('k1')"); + + Assert.assertTrue( + tableOperations.get(2).getDdl() + .equalsIgnoreCase("ALTER TABLE test_table ADD COLUMNS (new_col int)")); + + Assert.assertTrue( + tableOperations.get(3).getDdl() + .equalsIgnoreCase("ALTER TABLE test_table RENAME COLUMN new_col TO renamed_col")); + + Assert.assertTrue( + tableOperations.get(4).getDdl() + .equalsIgnoreCase("ALTER TABLE test_table ALTER COLUMN renamed_col TYPE BIGINT")); + + Assert.assertTrue( + tableOperations.get(5).getDdl() + .equalsIgnoreCase("ALTER TABLE test_table ALTER COLUMN renamed_col COMMENT 'new comment'")); + + Assert.assertTrue( + tableOperations.get(6).getDdl() + .equalsIgnoreCase("ALTER TABLE test_table ALTER COLUMN renamed_col DROP NOT NULL")); + + Assert.assertTrue( + tableOperations.get(7).getDdl() + .equalsIgnoreCase("ALTER TABLE test_table DROP COLUMN renamed_col") + ); } + + protected abstract void tableOperationsAddColumns(); + + protected abstract void tableOperationsRenameColumns(); + + protected abstract void tableOperationsChangeColumnType(); + + protected abstract void tableOperationsChangeColumnComment(); + + protected abstract void tableOperationsChangeColumnRequired(); + + protected abstract void tableOperationsDropColumn(); } \ No newline at end of file diff --git a/core/src/test/java/com/netease/arctic/formats/AmoroCatalogTestHelper.java b/core/src/test/java/com/netease/arctic/formats/AmoroCatalogTestHelper.java index a387564448..db1d606e0f 100644 --- a/core/src/test/java/com/netease/arctic/formats/AmoroCatalogTestHelper.java +++ b/core/src/test/java/com/netease/arctic/formats/AmoroCatalogTestHelper.java @@ -82,6 +82,23 @@ public interface AmoroCatalogTestHelper { */ String catalogName(); + /** + * Set table property. + * @param db database name + * @param tableName table name + * @param key property key + * @param value property value + */ + void setTableProperties(String db, String tableName, String key, String value); + + /** + * Remove table property. + * @param db database name + * @param tableName table name + * @param key property key + */ + void removeTableProperties(String db, String tableName, String key); + /** * Clean the catalog. drop databases and tables. */ diff --git a/core/src/test/java/com/netease/arctic/formats/IcebergHadoopCatalogTestHelper.java b/core/src/test/java/com/netease/arctic/formats/IcebergHadoopCatalogTestHelper.java index 7ec997ed31..a33a7332f9 100644 --- a/core/src/test/java/com/netease/arctic/formats/IcebergHadoopCatalogTestHelper.java +++ b/core/src/test/java/com/netease/arctic/formats/IcebergHadoopCatalogTestHelper.java @@ -111,6 +111,16 @@ public String catalogName() { return catalogName; } + @Override + public void setTableProperties(String db, String tableName, String key, String value) { + originalCatalog().loadTable(TableIdentifier.of(db, tableName)).updateProperties().set(key, value).commit(); + } + + @Override + public void removeTableProperties(String db, String tableName, String key) { + originalCatalog().loadTable(TableIdentifier.of(db, tableName)).updateProperties().remove(key).commit(); + } + @Override public void clean() { Catalog catalog = originalCatalog(); diff --git a/core/src/test/java/com/netease/arctic/formats/PaimonHadoopCatalogTestHelper.java b/core/src/test/java/com/netease/arctic/formats/PaimonHadoopCatalogTestHelper.java index 31da2ae9c8..aa06f9af64 100644 --- a/core/src/test/java/com/netease/arctic/formats/PaimonHadoopCatalogTestHelper.java +++ b/core/src/test/java/com/netease/arctic/formats/PaimonHadoopCatalogTestHelper.java @@ -29,6 +29,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.types.DataTypes; import java.util.HashMap; @@ -95,6 +96,30 @@ public String catalogName() { return catalogName; } + @Override + public void setTableProperties(String db, String tableName, String key, String value) { + try { + originalCatalog().alterTable( + Identifier.create(db, tableName), + SchemaChange.setOption(key, value), + true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeTableProperties(String db, String tableName, String key) { + try { + originalCatalog().alterTable( + Identifier.create(db, tableName), + SchemaChange.removeOption(key), + true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public void clean() { try (Catalog catalog = originalCatalog()) {