Skip to content

Commit

Permalink
[AMORO-1877]Support display paimon Operations in 'operations' page (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shidayang authored Oct 13, 2023
1 parent 8bc1134 commit 2dd46f1
Show file tree
Hide file tree
Showing 21 changed files with 1,653 additions and 348 deletions.
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ This product includes code from Apache Iceberg.
* org.apache.iceberg.spark.data.ParquetWithSparkSchemaVisitor copied and modified to org.apache.iceberg.spark.data.AdaptHiveParquetWithSparkSchemaVisitor
* org.apache.iceberg.flink.data.ParquetWithFlinkSchemaVisitor copied and modified to org.apache.iceberg.flink.data.AdaptHiveParquetWithFlinkSchemaVisitor
* org.apache.iceberg.parquet.ParquetUtil copied and modified to org.apache.iceberg.parquet.AdaptHiveParquetUtil
* org.apache.paimon.spark.SparkTypeUtilsorg.apache.paimon.spark.SparkTypeUtils copied and modified to com.netease.arctic.server.dashboard.component.reverser.PaimonTypeToSparkType
* org.apache.iceberg.spark.TypeToSparkType copied and modified to com.netease.arctic.server.dashboard.component.reverser.IcebergTypeToSparkType



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.netease.arctic.data.DataFileType;
import com.netease.arctic.data.FileNameRules;
import com.netease.arctic.op.SnapshotSummary;
import com.netease.arctic.server.dashboard.component.reverser.DDLReverser;
import com.netease.arctic.server.dashboard.component.reverser.IcebergTableMetaExtract;
import com.netease.arctic.server.dashboard.model.AMSColumnInfo;
import com.netease.arctic.server.dashboard.model.AMSPartitionField;
import com.netease.arctic.server.dashboard.model.DDLInfo;
Expand All @@ -44,13 +46,11 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.InternalRecordWrapper;
Expand All @@ -66,10 +66,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -230,62 +228,16 @@ public List<PartitionFileBaseInfo> getTransactionDetail(AmoroTable<?> amoroTable

public List<DDLInfo> getTableOperations(AmoroTable<?> amoroTable) {
ArcticTable arcticTable = getTable(amoroTable);
List<DDLInfo> result = new ArrayList<>();
Table table;
if (arcticTable.isKeyedTable()) {
table = arcticTable.asKeyedTable().baseTable();
} else {
table = arcticTable.asUnkeyedTable();
}
List<HistoryEntry> snapshotLog = ((HasTableOperations) table).operations().current().snapshotLog();
List<org.apache.iceberg.TableMetadata.MetadataLogEntry> metadataLogEntries =
((HasTableOperations) table).operations().current().previousFiles();
Set<Long> time = new HashSet<>();
snapshotLog.forEach(e -> time.add(e.timestampMillis()));
String lastMetadataLogEntryFile = null;
org.apache.iceberg.TableMetadata lastTableMetadata = null;
for (int i = 1; i < metadataLogEntries.size(); i++) {
org.apache.iceberg.TableMetadata.MetadataLogEntry currentEntry = metadataLogEntries.get(i);
if (!time.contains(currentEntry.timestampMillis())) {
org.apache.iceberg.TableMetadata.MetadataLogEntry previousEntry = metadataLogEntries.get(i - 1);
org.apache.iceberg.TableMetadata oldTableMetadata;
if (lastMetadataLogEntryFile == null || !lastMetadataLogEntryFile.equals(previousEntry.file())) {
oldTableMetadata = TableMetadataParser.read(table.io(), previousEntry.file());
} else {
oldTableMetadata = lastTableMetadata;
}

org.apache.iceberg.TableMetadata
newTableMetadata = TableMetadataParser.read(table.io(), currentEntry.file());
lastMetadataLogEntryFile = currentEntry.file();
lastTableMetadata = newTableMetadata;

DDLInfo.Generator generator = new DDLInfo.Generator();
result.addAll(generator.tableIdentify(arcticTable.id())
.oldMeta(oldTableMetadata)
.newMeta(newTableMetadata)
.generate());
}
}
if (metadataLogEntries.size() > 0) {
org.apache.iceberg.TableMetadata.MetadataLogEntry previousEntry = metadataLogEntries
.get(metadataLogEntries.size() - 1);
org.apache.iceberg.TableMetadata oldTableMetadata;

if (lastMetadataLogEntryFile == null || !lastMetadataLogEntryFile.equals(previousEntry.file())) {
oldTableMetadata = TableMetadataParser.read(table.io(), previousEntry.file());
} else {
oldTableMetadata = lastTableMetadata;
}

org.apache.iceberg.TableMetadata newTableMetadata = ((HasTableOperations) table).operations().current();
DDLInfo.Generator generator = new DDLInfo.Generator();
result.addAll(generator.tableIdentify(arcticTable.id())
.oldMeta(oldTableMetadata)
.newMeta(newTableMetadata)
.generate());
}
return result;
IcebergTableMetaExtract extract = new IcebergTableMetaExtract();
DDLReverser<Table> ddlReverser = new DDLReverser<>(extract);
return ddlReverser.reverse(table, amoroTable.id());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.dashboard.component.reverser.DDLReverser;
import com.netease.arctic.server.dashboard.component.reverser.PaimonTableMetaExtract;
import com.netease.arctic.server.dashboard.model.DDLInfo;
import com.netease.arctic.server.dashboard.model.PartitionBaseInfo;
import com.netease.arctic.server.dashboard.model.PartitionFileBaseInfo;
import com.netease.arctic.server.dashboard.model.ServerTableMeta;
import com.netease.arctic.server.dashboard.model.TransactionsOfTable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.paimon.table.DataTable;

import java.util.List;

Expand Down Expand Up @@ -55,7 +58,10 @@ public List<PartitionFileBaseInfo> getTransactionDetail(AmoroTable<?> amoroTable

@Override
public List<DDLInfo> getTableOperations(AmoroTable<?> amoroTable) {
throw new UnsupportedOperationException();
DataTable table = getTable(amoroTable);
PaimonTableMetaExtract extract = new PaimonTableMetaExtract();
DDLReverser<DataTable> ddlReverser = new DDLReverser<>(extract);
return ddlReverser.reverse(table, amoroTable.id());
}

@Override
Expand All @@ -67,4 +73,8 @@ public List<PartitionBaseInfo> getTablePartition(AmoroTable<?> amoroTable) {
public List<PartitionFileBaseInfo> getTableFile(AmoroTable<?> amoroTable, String partition) {
throw new UnsupportedOperationException();
}

private DataTable getTable(AmoroTable<?> amoroTable) {
return (DataTable) amoroTable.originalTable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.server.dashboard.component.reverser;

import com.google.common.collect.Maps;
import com.netease.arctic.server.dashboard.model.DDLInfo;
import com.netease.arctic.table.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public class DDLReverser<T> {

private final TableMetaExtract<T> tableMetaExtract;

public DDLReverser(TableMetaExtract<T> tableMetaExtract) {
this.tableMetaExtract = tableMetaExtract;
}

public List<DDLInfo> reverse(T table, TableIdentifier tableIdentifier) {

//Currently only spark metadata change
SparkMetadataChangeHandler metadataChangeHandler =
new SparkMetadataChangeHandler(tableIdentifier.getTableName());

List<TableMetaExtract.InternalTableMeta> internalTableMetas = tableMetaExtract.extractTable(table);
if (internalTableMetas.isEmpty() || internalTableMetas.size() == 1) {
return Collections.emptyList();
}

List<DDLInfo> result = new ArrayList<>();

for (int i = 1; i < internalTableMetas.size(); i++) {
TableMetaExtract.InternalTableMeta pre = internalTableMetas.get(i - 1);
TableMetaExtract.InternalTableMeta current = internalTableMetas.get(i);

compareProperties(pre.getProperties(), current.getProperties(), metadataChangeHandler)
.forEach(sql -> result.add(
DDLInfo.of(tableIdentifier, sql, DDLInfo.DDLType.UPDATE_PROPERTIES, current.getTime()))
);

compareSchemas(pre.getInternalSchema(), current.getInternalSchema(), metadataChangeHandler)
.forEach(sql -> result.add(
DDLInfo.of(tableIdentifier, sql, DDLInfo.DDLType.UPDATE_SCHEMA, current.getTime()))
);
}
return result;
}

private List<String> compareProperties(
Map<String, String> pre,
Map<String, String> current,
MetadataChangeHandler metadataChangeHandler) {
// Although only one SQL statement can be executed at a time,
// using the Java API to make modifications can result in the effect of multiple SQL statements.
List<String> result = new ArrayList<>();

// Remove properties
Set<String> removeProperties = Sets.difference(pre.keySet(), current.keySet());
if (!removeProperties.isEmpty()) {
result.add(metadataChangeHandler.removeProperties(removeProperties));
}

// Change and add properties
Map<String, String> changeAndAddProperties = Maps.newHashMap();
for (Map.Entry<String, String> currentEntry : current.entrySet()) {
String key = currentEntry.getKey();
String value = currentEntry.getValue();
if (pre.containsKey(key)) {
if (!pre.get(key).equals(value)) {
changeAndAddProperties.put(key, value);
}
} else {
changeAndAddProperties.put(key, value);
}
}
if (!changeAndAddProperties.isEmpty()) {
result.add(metadataChangeHandler.changeAndAddProperties(changeAndAddProperties));
}

return result;
}

private List<String> compareSchemas(List<TableMetaExtract.InternalSchema> pre,
List<TableMetaExtract.InternalSchema> current, MetadataChangeHandler metadataChangeHandler) {
// Although only one SQL statement can be executed at a time,
// using the Java API to make modifications can result in the effect of multiple SQL statements.
List<String> result = new ArrayList<>();

Map<Integer, TableMetaExtract.InternalSchema> preMap = pre.stream().collect(
Collectors.toMap(TableMetaExtract.InternalSchema::getId, v -> v));
Map<Integer, TableMetaExtract.InternalSchema> currentMap = current.stream().collect(Collectors.toMap(
TableMetaExtract.InternalSchema::getId, v -> v));

//Remove columns.
Set<Integer> removeColumns = Sets.difference(preMap.keySet(), currentMap.keySet());
Set<Integer> normalizedRemoveColumns = removeColumns.stream()
.filter(s -> !removeColumns.contains(preMap.get(s).getParentId()))
.collect(Collectors.toSet());
if (!normalizedRemoveColumns.isEmpty()) {
result.add(
metadataChangeHandler
.dropColumns(
normalizedRemoveColumns
.stream()
.map(preMap::get)
.map(TableMetaExtract.InternalSchema::getName)
.collect(Collectors.toSet())
)
);
}

//Add new columns.
Set<Integer> newColumns = Sets.difference(currentMap.keySet(), preMap.keySet());
Set<Integer> normalizedNewColumns = newColumns.stream()
.filter(s -> !newColumns.contains(currentMap.get(s).getParentId()))
.collect(Collectors.toSet());
if (!normalizedNewColumns.isEmpty()) {
result.add(
metadataChangeHandler
.addNewColumns(
normalizedNewColumns
.stream()
.map(currentMap::get)
.collect(Collectors.toList())
)
);
}

//Change columns.
for (TableMetaExtract.InternalSchema currentSchema : current) {
if (!preMap.containsKey(currentSchema.getId())) {
continue;
}

TableMetaExtract.InternalSchema preSchema = preMap.get(currentSchema.getId());
if (!Objects.equals(preSchema.getName(), currentSchema.getName())) {
result.add(metadataChangeHandler.renameColumnName(preSchema.getName(), currentSchema.getName()));
}
if (!Objects.equals(preSchema.getType(), currentSchema.getType())) {
result.add(metadataChangeHandler.changeColumnType(currentSchema.getName(), currentSchema.getType()));
}
if (!Objects.equals(preSchema.getComment(), currentSchema.getComment())) {
result.add(metadataChangeHandler.changeColumnsComment(currentSchema.getName(), currentSchema.getComment()));
}
if (preSchema.isRequired() != currentSchema.isRequired()) {
result.add(metadataChangeHandler.changeColumnsRequire(currentSchema.getName(), currentSchema.isRequired()));
}
}

return result;
}
}
Loading

0 comments on commit 2dd46f1

Please sign in to comment.