Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1877]Support display paimon Operations in 'operations' page #2079

Merged
merged 50 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d2ca938
Integration paimon
shidayang Sep 12, 2023
3f93f96
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Sep 12, 2023
59be5cd
Integration paimon
shidayang Sep 12, 2023
4639621
Integration paimon
shidayang Sep 13, 2023
06926dc
Integration paimon
shidayang Sep 13, 2023
0df88f7
tmp
shidayang Sep 13, 2023
ecaa102
Support registering for the Paimon Catalog
shidayang Sep 13, 2023
ebe4b7e
Merge branch 'master' into paimon-integration-copy
shidayang Sep 13, 2023
0c6b09a
Support registering for the Paimon Catalog
shidayang Sep 14, 2023
da3a94c
Merge remote-tracking branch 'shidayang/paimon-integration-copy' into…
shidayang Sep 14, 2023
5f66917
Merge branch 'master' into paimon-integration-copy
shidayang Sep 14, 2023
4bb61cf
Support registering for the Paimon Catalog
shidayang Sep 14, 2023
55de2ae
Merge remote-tracking branch 'shidayang/paimon-integration-copy' into…
shidayang Sep 14, 2023
2b37a6e
Support registering for the Paimon Catalog
shidayang Sep 14, 2023
cb0c241
Fix ut
shidayang Sep 14, 2023
587d23f
Polish code
shidayang Sep 15, 2023
bde9ec9
Merge branch 'master' into paimon-integration-copy
shidayang Sep 15, 2023
9720442
Fix compile error
shidayang Sep 15, 2023
80e9a75
Merge remote-tracking branch 'shidayang/paimon-integration-copy' into…
shidayang Sep 15, 2023
f1df45a
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Sep 19, 2023
ee9c632
Merge master
shidayang Sep 19, 2023
b3df91a
polish code
shidayang Sep 20, 2023
9ccfaed
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Sep 20, 2023
2aea58f
Merge master
shidayang Sep 20, 2023
2b8088f
Fix UT
shidayang Sep 20, 2023
7ec198e
Merge branch 'master' into paimon-integration-copy
shidayang Sep 20, 2023
722f0ff
Polish code
shidayang Sep 25, 2023
2686ef3
Merge branch 'master' into paimon-integration-copy
shidayang Sep 25, 2023
cfc36ce
Polish code
shidayang Sep 25, 2023
b640a26
tmp
shidayang Oct 8, 2023
52c7923
polish code
shidayang Oct 9, 2023
3e5982e
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Oct 9, 2023
d43afd1
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Oct 9, 2023
4c5457a
polish code
shidayang Oct 9, 2023
82cf7ec
Merge branch 'paimon-integration-copy' into paimon-ddl-history
shidayang Oct 9, 2023
d8b769e
Merge branch 'master' into paimon-integration-copy
zhoujinsong Oct 9, 2023
c24efa4
tmp
shidayang Oct 9, 2023
661b136
Update ams/server/src/main/java/com/netease/arctic/server/dashboard/M…
shidayang Oct 9, 2023
2306f02
Update ams/server/src/main/java/com/netease/arctic/server/dashboard/P…
shidayang Oct 9, 2023
5f87798
tmp
shidayang Oct 9, 2023
0b2c393
polish code
shidayang Oct 9, 2023
a5beb36
polish code
shidayang Oct 9, 2023
be36a08
Merge remote-tracking branch 'origin/master' into paimon-ddl-history
shidayang Oct 10, 2023
36b6923
Merge branch 'paimon-integration-copy' into paimon-ddl-history
shidayang Oct 10, 2023
b445be3
polish code
shidayang Oct 11, 2023
c234724
Merge remote-tracking branch 'origin/master' into paimon-ddl-history
shidayang Oct 11, 2023
261fb1c
polish code
shidayang Oct 11, 2023
65fd126
Merge branch 'master' into paimon-ddl-history
shidayang Oct 11, 2023
bf6325c
Merge branch 'master' into paimon-ddl-history
baiyangtx Oct 11, 2023
11d3f0b
polish code
shidayang Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ This product includes code from Apache Iceberg.
* org.apache.iceberg.spark.data.ParquetWithSparkSchemaVisitor copied and modified to org.apache.iceberg.spark.data.AdaptHiveParquetWithSparkSchemaVisitor
* org.apache.iceberg.flink.data.ParquetWithFlinkSchemaVisitor copied and modified to org.apache.iceberg.flink.data.AdaptHiveParquetWithFlinkSchemaVisitor
* org.apache.iceberg.parquet.ParquetUtil copied and modified to org.apache.iceberg.parquet.AdaptHiveParquetUtil
* org.apache.paimon.spark.SparkTypeUtilsorg.apache.paimon.spark.SparkTypeUtils copied and modified to com.netease.arctic.server.dashboard.component.reverser.PaimonTypeToSparkType
* org.apache.iceberg.spark.TypeToSparkType copied and modified to com.netease.arctic.server.dashboard.component.reverser.IcebergTypeToSparkType



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.netease.arctic.data.DataFileType;
import com.netease.arctic.data.FileNameRules;
import com.netease.arctic.op.SnapshotSummary;
import com.netease.arctic.server.dashboard.component.reverser.DDLReverser;
import com.netease.arctic.server.dashboard.component.reverser.IcebergTableMetaExtract;
import com.netease.arctic.server.dashboard.model.AMSColumnInfo;
import com.netease.arctic.server.dashboard.model.AMSPartitionField;
import com.netease.arctic.server.dashboard.model.DDLInfo;
Expand All @@ -44,13 +46,11 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.InternalRecordWrapper;
Expand All @@ -66,10 +66,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -230,62 +228,16 @@ public List<PartitionFileBaseInfo> getTransactionDetail(AmoroTable<?> amoroTable

public List<DDLInfo> getTableOperations(AmoroTable<?> amoroTable) {
ArcticTable arcticTable = getTable(amoroTable);
List<DDLInfo> result = new ArrayList<>();
Table table;
if (arcticTable.isKeyedTable()) {
table = arcticTable.asKeyedTable().baseTable();
} else {
table = arcticTable.asUnkeyedTable();
}
List<HistoryEntry> snapshotLog = ((HasTableOperations) table).operations().current().snapshotLog();
List<org.apache.iceberg.TableMetadata.MetadataLogEntry> metadataLogEntries =
((HasTableOperations) table).operations().current().previousFiles();
Set<Long> time = new HashSet<>();
snapshotLog.forEach(e -> time.add(e.timestampMillis()));
String lastMetadataLogEntryFile = null;
org.apache.iceberg.TableMetadata lastTableMetadata = null;
for (int i = 1; i < metadataLogEntries.size(); i++) {
org.apache.iceberg.TableMetadata.MetadataLogEntry currentEntry = metadataLogEntries.get(i);
if (!time.contains(currentEntry.timestampMillis())) {
org.apache.iceberg.TableMetadata.MetadataLogEntry previousEntry = metadataLogEntries.get(i - 1);
org.apache.iceberg.TableMetadata oldTableMetadata;
if (lastMetadataLogEntryFile == null || !lastMetadataLogEntryFile.equals(previousEntry.file())) {
oldTableMetadata = TableMetadataParser.read(table.io(), previousEntry.file());
} else {
oldTableMetadata = lastTableMetadata;
}

org.apache.iceberg.TableMetadata
newTableMetadata = TableMetadataParser.read(table.io(), currentEntry.file());
lastMetadataLogEntryFile = currentEntry.file();
lastTableMetadata = newTableMetadata;

DDLInfo.Generator generator = new DDLInfo.Generator();
result.addAll(generator.tableIdentify(arcticTable.id())
.oldMeta(oldTableMetadata)
.newMeta(newTableMetadata)
.generate());
}
}
if (metadataLogEntries.size() > 0) {
org.apache.iceberg.TableMetadata.MetadataLogEntry previousEntry = metadataLogEntries
.get(metadataLogEntries.size() - 1);
org.apache.iceberg.TableMetadata oldTableMetadata;

if (lastMetadataLogEntryFile == null || !lastMetadataLogEntryFile.equals(previousEntry.file())) {
oldTableMetadata = TableMetadataParser.read(table.io(), previousEntry.file());
} else {
oldTableMetadata = lastTableMetadata;
}

org.apache.iceberg.TableMetadata newTableMetadata = ((HasTableOperations) table).operations().current();
DDLInfo.Generator generator = new DDLInfo.Generator();
result.addAll(generator.tableIdentify(arcticTable.id())
.oldMeta(oldTableMetadata)
.newMeta(newTableMetadata)
.generate());
}
return result;
IcebergTableMetaExtract extract = new IcebergTableMetaExtract();
DDLReverser<Table> ddlReverser = new DDLReverser<>(extract);
return ddlReverser.reverse(table, amoroTable.id());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.dashboard.component.reverser.DDLReverser;
import com.netease.arctic.server.dashboard.component.reverser.PaimonTableMetaExtract;
import com.netease.arctic.server.dashboard.model.DDLInfo;
import com.netease.arctic.server.dashboard.model.PartitionBaseInfo;
import com.netease.arctic.server.dashboard.model.PartitionFileBaseInfo;
import com.netease.arctic.server.dashboard.model.ServerTableMeta;
import com.netease.arctic.server.dashboard.model.TransactionsOfTable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.paimon.table.DataTable;

import java.util.List;

Expand Down Expand Up @@ -55,7 +58,10 @@ public List<PartitionFileBaseInfo> getTransactionDetail(AmoroTable<?> amoroTable

@Override
public List<DDLInfo> getTableOperations(AmoroTable<?> amoroTable) {
throw new UnsupportedOperationException();
DataTable table = getTable(amoroTable);
PaimonTableMetaExtract extract = new PaimonTableMetaExtract();
DDLReverser<DataTable> ddlReverser = new DDLReverser<>(extract);
return ddlReverser.reverse(table, amoroTable.id());
}

@Override
Expand All @@ -67,4 +73,8 @@ public List<PartitionBaseInfo> getTablePartition(AmoroTable<?> amoroTable) {
public List<PartitionFileBaseInfo> getTableFile(AmoroTable<?> amoroTable, String partition) {
throw new UnsupportedOperationException();
}

private DataTable getTable(AmoroTable<?> amoroTable) {
return (DataTable) amoroTable.originalTable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.server.dashboard.component.reverser;

import com.google.common.collect.Maps;
import com.netease.arctic.server.dashboard.model.DDLInfo;
import com.netease.arctic.table.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public class DDLReverser<T> {

private final TableMetaExtract<T> tableMetaExtract;

public DDLReverser(TableMetaExtract<T> tableMetaExtract) {
this.tableMetaExtract = tableMetaExtract;
}

public List<DDLInfo> reverse(T table, TableIdentifier tableIdentifier) {

//Currently only spark metadata change
SparkMetadataChangeHandler metadataChangeHandler =
new SparkMetadataChangeHandler(tableIdentifier.getTableName());

List<TableMetaExtract.InternalTableMeta> internalTableMetas = tableMetaExtract.extractTable(table);
if (internalTableMetas.isEmpty() || internalTableMetas.size() == 1) {
return Collections.emptyList();
}

List<DDLInfo> result = new ArrayList<>();

for (int i = 1; i < internalTableMetas.size(); i++) {
TableMetaExtract.InternalTableMeta pre = internalTableMetas.get(i - 1);
TableMetaExtract.InternalTableMeta current = internalTableMetas.get(i);

compareProperties(pre.getProperties(), current.getProperties(), metadataChangeHandler)
.forEach(sql -> result.add(
DDLInfo.of(tableIdentifier, sql, DDLInfo.DDLType.UPDATE_PROPERTIES, current.getTime()))
);

compareSchemas(pre.getInternalSchema(), current.getInternalSchema(), metadataChangeHandler)
.forEach(sql -> result.add(
DDLInfo.of(tableIdentifier, sql, DDLInfo.DDLType.UPDATE_SCHEMA, current.getTime()))
);
}
return result;
}

private List<String> compareProperties(
Map<String, String> pre,
Map<String, String> current,
MetadataChangeHandler metadataChangeHandler) {
// Although only one SQL statement can be executed at a time,
// using the Java API to make modifications can result in the effect of multiple SQL statements.
List<String> result = new ArrayList<>();

// Remove properties
Set<String> removeProperties = Sets.difference(pre.keySet(), current.keySet());
if (!removeProperties.isEmpty()) {
result.add(metadataChangeHandler.removeProperties(removeProperties));
}

// Change and add properties
Map<String, String> changeAndAddProperties = Maps.newHashMap();
for (Map.Entry<String, String> currentEntry : current.entrySet()) {
String key = currentEntry.getKey();
String value = currentEntry.getValue();
if (pre.containsKey(key)) {
if (!pre.get(key).equals(value)) {
changeAndAddProperties.put(key, value);
}
} else {
changeAndAddProperties.put(key, value);
}
}
if (!changeAndAddProperties.isEmpty()) {
result.add(metadataChangeHandler.changeAndAddProperties(changeAndAddProperties));
}

return result;
}

private List<String> compareSchemas(List<TableMetaExtract.InternalSchema> pre,
List<TableMetaExtract.InternalSchema> current, MetadataChangeHandler metadataChangeHandler) {
// Although only one SQL statement can be executed at a time,
// using the Java API to make modifications can result in the effect of multiple SQL statements.
List<String> result = new ArrayList<>();

Map<Integer, TableMetaExtract.InternalSchema> preMap = pre.stream().collect(
Collectors.toMap(TableMetaExtract.InternalSchema::getId, v -> v));
Map<Integer, TableMetaExtract.InternalSchema> currentMap = current.stream().collect(Collectors.toMap(
TableMetaExtract.InternalSchema::getId, v -> v));

//Remove columns.
Set<Integer> removeColumns = Sets.difference(preMap.keySet(), currentMap.keySet());
Set<Integer> normalizedRemoveColumns = removeColumns.stream()
.filter(s -> !removeColumns.contains(preMap.get(s).getParentId()))
.collect(Collectors.toSet());
if (!normalizedRemoveColumns.isEmpty()) {
result.add(
metadataChangeHandler
.dropColumns(
normalizedRemoveColumns
.stream()
.map(preMap::get)
.map(TableMetaExtract.InternalSchema::getName)
.collect(Collectors.toSet())
)
);
}

//Add new columns.
Set<Integer> newColumns = Sets.difference(currentMap.keySet(), preMap.keySet());
Set<Integer> normalizedNewColumns = newColumns.stream()
.filter(s -> !newColumns.contains(currentMap.get(s).getParentId()))
.collect(Collectors.toSet());
if (!normalizedNewColumns.isEmpty()) {
result.add(
metadataChangeHandler
.addNewColumns(
normalizedNewColumns
.stream()
.map(currentMap::get)
.collect(Collectors.toList())
)
);
}

//Change columns.
for (TableMetaExtract.InternalSchema currentSchema : current) {
if (!preMap.containsKey(currentSchema.getId())) {
continue;
}

TableMetaExtract.InternalSchema preSchema = preMap.get(currentSchema.getId());
if (!Objects.equals(preSchema.getName(), currentSchema.getName())) {
result.add(metadataChangeHandler.renameColumnName(preSchema.getName(), currentSchema.getName()));
}
if (!Objects.equals(preSchema.getType(), currentSchema.getType())) {
result.add(metadataChangeHandler.changeColumnType(currentSchema.getName(), currentSchema.getType()));
}
if (!Objects.equals(preSchema.getComment(), currentSchema.getComment())) {
result.add(metadataChangeHandler.changeColumnsComment(currentSchema.getName(), currentSchema.getComment()));
}
if (preSchema.isRequired() != currentSchema.isRequired()) {
result.add(metadataChangeHandler.changeColumnsRequire(currentSchema.getName(), currentSchema.isRequired()));
}
}

return result;
}
}
Loading