diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/ServerTableDescriptor.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/ServerTableDescriptor.java index 8e83900733..03d2036b5a 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/dashboard/ServerTableDescriptor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/ServerTableDescriptor.java @@ -2,6 +2,7 @@ import com.netease.arctic.data.DataFileType; import com.netease.arctic.data.FileNameRules; +import com.netease.arctic.op.SnapshotSummary; import com.netease.arctic.server.dashboard.model.DDLInfo; import com.netease.arctic.server.dashboard.model.PartitionBaseInfo; import com.netease.arctic.server.dashboard.model.PartitionFileBaseInfo; @@ -14,7 +15,6 @@ import com.netease.arctic.server.table.ServerTableIdentifier; import com.netease.arctic.server.table.TableService; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.trace.SnapshotSummary; import com.netease.arctic.utils.ManifestEntryFields; import org.apache.commons.collections.CollectionUtils; import org.apache.iceberg.DataFile; @@ -36,7 +36,6 @@ import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -229,7 +228,8 @@ public List getOptimizingProcesses(String catalog, String } public List getOptimizingTasks(long processId) { - return getAs(OptimizingMapper.class, + return getAs( + OptimizingMapper.class, mapper -> mapper.selectOptimizeTaskMetas(Collections.singletonList(processId))); } @@ -239,7 +239,8 @@ public List getOptimizingTasks(List p } List processIds = processMetaList.stream() .map(OptimizingProcessMeta::getProcessId).collect(Collectors.toList()); - return getAs(OptimizingMapper.class, + return getAs( + OptimizingMapper.class, mapper -> mapper.selectOptimizeTaskMetas(processIds)); } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/KeyedTableCommit.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/KeyedTableCommit.java index 74fa8df3eb..6187f35f03 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/KeyedTableCommit.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/KeyedTableCommit.java @@ -5,11 +5,11 @@ import com.netease.arctic.data.PrimaryKeyedFile; import com.netease.arctic.hive.utils.TableTypeUtil; import com.netease.arctic.op.OverwriteBaseFiles; +import com.netease.arctic.op.SnapshotSummary; import com.netease.arctic.optimizing.RewriteFilesInput; import com.netease.arctic.optimizing.RewriteFilesOutput; import com.netease.arctic.server.exception.OptimizingCommitException; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.trace.SnapshotSummary; import com.netease.arctic.utils.ContentFiles; import com.netease.arctic.utils.TablePropertyUtil; import org.apache.commons.collections.CollectionUtils; @@ -23,13 +23,11 @@ import org.glassfish.jersey.internal.guava.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; - import static com.netease.arctic.hive.op.UpdateHiveFiles.DELETE_UNTRACKED_HIVE_FILE; import static com.netease.arctic.server.ArcticServiceConstants.INVALID_SNAPSHOT_ID; diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/UnKeyedTableCommit.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/UnKeyedTableCommit.java index ec4a899c9b..c8ac926319 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/UnKeyedTableCommit.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/UnKeyedTableCommit.java @@ -25,6 +25,7 @@ import com.netease.arctic.hive.utils.HivePartitionUtil; import com.netease.arctic.hive.utils.HiveTableUtil; import com.netease.arctic.hive.utils.TableTypeUtil; +import com.netease.arctic.op.SnapshotSummary; import com.netease.arctic.optimizing.OptimizingInputProperties; import com.netease.arctic.optimizing.RewriteFilesOutput; import com.netease.arctic.server.ArcticServiceConstants; @@ -32,7 +33,6 @@ import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.UnkeyedTable; -import com.netease.arctic.trace.SnapshotSummary; import com.netease.arctic.utils.ContentFiles; import com.netease.arctic.utils.TableFileUtil; import org.apache.commons.collections.CollectionUtils; @@ -52,7 +52,6 @@ import org.glassfish.jersey.internal.guava.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -63,7 +62,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - import static com.netease.arctic.hive.op.UpdateHiveFiles.DELETE_UNTRACKED_HIVE_FILE; import static com.netease.arctic.server.ArcticServiceConstants.INVALID_SNAPSHOT_ID; diff --git a/core/src/main/java/com/netease/arctic/catalog/BasicArcticCatalog.java b/core/src/main/java/com/netease/arctic/catalog/BasicArcticCatalog.java index c79ce8b2af..a1dc539128 100644 --- a/core/src/main/java/com/netease/arctic/catalog/BasicArcticCatalog.java +++ b/core/src/main/java/com/netease/arctic/catalog/BasicArcticCatalog.java @@ -29,14 +29,15 @@ import com.netease.arctic.io.ArcticFileIO; import com.netease.arctic.io.ArcticFileIOs; import com.netease.arctic.op.ArcticHadoopTableOperations; +import com.netease.arctic.op.CreateTableTransaction; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.PrimaryKeySpec; import com.netease.arctic.table.TableBuilder; import com.netease.arctic.table.TableIdentifier; +import com.netease.arctic.table.TableMetaStore; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.blocker.BasicTableBlockerManager; import com.netease.arctic.table.blocker.TableBlockerManager; -import com.netease.arctic.trace.CreateTableTransaction; import com.netease.arctic.utils.CatalogUtil; import com.netease.arctic.utils.CompatiblePropertyUtil; import com.netease.arctic.utils.ConvertStructUtil; @@ -76,6 +77,7 @@ public class BasicArcticCatalog implements ArcticCatalog { protected CatalogMeta catalogMeta; protected Map customProperties; protected MixedTables tables; + protected transient TableMetaStore tableMetaStore; @Override public String name() { @@ -92,6 +94,7 @@ public void initialize( this.customProperties = properties; CatalogUtil.mergeCatalogProperties(catalogMeta, properties); tables = newMixedTables(catalogMeta); + tableMetaStore = CatalogUtil.buildMetaStore(meta); } protected MixedTables newMixedTables(CatalogMeta catalogMeta) { @@ -287,18 +290,14 @@ public ArcticTable create() { return table; } - protected ArcticTable createTableByMeta(TableMeta tableMeta, Schema schema, PrimaryKeySpec primaryKeySpec, - PartitionSpec partitionSpec) { - return tables.createTableByMeta(tableMeta, schema, primaryKeySpec, partitionSpec); - } - - public Transaction newCreateTableTransaction() { - ArcticFileIO arcticFileIO = ArcticFileIOs.buildHadoopFileIO(tables.getTableMetaStore()); + @Override + public Transaction createTransaction() { + ArcticFileIO arcticFileIO = ArcticFileIOs.buildHadoopFileIO(tableMetaStore); ConvertStructUtil.TableMetaBuilder builder = createTableMataBuilder(); TableMeta meta = builder.build(); String location = getTableLocationForCreate(); TableOperations tableOperations = new ArcticHadoopTableOperations(new Path(location), - arcticFileIO, tables.getTableMetaStore().getConfiguration()); + arcticFileIO, tableMetaStore.getConfiguration()); TableMetadata tableMetadata = tableMetadata(schema, partitionSpec, sortOrder, properties, location); Transaction transaction = Transactions.createTableTransaction(identifier.getTableName(), tableOperations, tableMetadata); @@ -318,6 +317,12 @@ public Transaction newCreateTableTransaction() { ); } + protected ArcticTable createTableByMeta( + TableMeta tableMeta, Schema schema, PrimaryKeySpec primaryKeySpec, + PartitionSpec partitionSpec) { + return tables.createTableByMeta(tableMeta, schema, primaryKeySpec, partitionSpec); + } + protected void doCreateCheck() { if (primaryKeySpec.primaryKeyExisted()) { primaryKeySpec.fieldNames().forEach(primaryKey -> { @@ -343,17 +348,20 @@ protected void doCreateCheck() { } protected void checkProperties() { - Map mergedProperties = CatalogUtil.mergeCatalogPropertiesToTable(properties, - catalogMeta.getCatalogProperties()); + Map mergedProperties = CatalogUtil.mergeCatalogPropertiesToTable( + properties, catalogMeta.getCatalogProperties()); boolean enableStream = CompatiblePropertyUtil.propertyAsBoolean(mergedProperties, TableProperties.ENABLE_LOG_STORE, TableProperties.ENABLE_LOG_STORE_DEFAULT); if (enableStream) { - Preconditions.checkArgument(mergedProperties.containsKey(TableProperties.LOG_STORE_MESSAGE_TOPIC), + Preconditions.checkArgument( + mergedProperties.containsKey(TableProperties.LOG_STORE_MESSAGE_TOPIC), "log-store.topic must not be null when log-store.enabled is true."); - Preconditions.checkArgument(mergedProperties.containsKey(TableProperties.LOG_STORE_ADDRESS), + Preconditions.checkArgument( + mergedProperties.containsKey(TableProperties.LOG_STORE_ADDRESS), "log-store.address must not be null when log-store.enabled is true."); String logStoreType = mergedProperties.get(LOG_STORE_TYPE); - Preconditions.checkArgument(logStoreType == null || + Preconditions.checkArgument( + logStoreType == null || logStoreType.equals(LOG_STORE_STORAGE_TYPE_KAFKA) || logStoreType.equals(LOG_STORE_STORAGE_TYPE_PULSAR), String.format( diff --git a/core/src/main/java/com/netease/arctic/catalog/IcebergCatalogWrapper.java b/core/src/main/java/com/netease/arctic/catalog/IcebergCatalogWrapper.java index 61921f9e9a..ae5d53fe72 100644 --- a/core/src/main/java/com/netease/arctic/catalog/IcebergCatalogWrapper.java +++ b/core/src/main/java/com/netease/arctic/catalog/IcebergCatalogWrapper.java @@ -37,6 +37,7 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -103,12 +104,11 @@ private void initialize(CatalogMeta meta, Map properties) { if (meta.getCatalogProperties().containsKey(CatalogMetaProperties.KEY_TABLE_FILTER)) { String tableFilter = - meta.getCatalogProperties().get(CatalogMetaProperties.KEY_TABLE_FILTER); + meta.getCatalogProperties().get(CatalogMetaProperties.KEY_TABLE_FILTER); tableFilterPattern = Pattern.compile(tableFilter); } else { tableFilterPattern = null; } - } public IcebergCatalogWrapper(CatalogMeta meta, Map properties) { @@ -279,6 +279,13 @@ public ArcticTable create() { return new BasicIcebergTable(identifier, table, arcticFileIO, meta.getCatalogProperties()); } + @Override + public Transaction createTransaction() { + return icebergCatalog.newCreateTableTransaction( + toIcebergTableIdentifier(identifier), schema, + spec, properties); + } + @Override public TableBuilder withPrimaryKeySpec(PrimaryKeySpec primaryKeySpec) { Preconditions.checkArgument( @@ -300,7 +307,7 @@ public BasicIcebergTable( Table icebergTable, ArcticFileIO arcticFileIO, Map catalogProperties) { - super(tableIdentifier, icebergTable, arcticFileIO, null, catalogProperties); + super(tableIdentifier, icebergTable, arcticFileIO, catalogProperties); } @Override diff --git a/core/src/main/java/com/netease/arctic/catalog/MixedTables.java b/core/src/main/java/com/netease/arctic/catalog/MixedTables.java index 1b8ed8e0de..7b605c34fe 100644 --- a/core/src/main/java/com/netease/arctic/catalog/MixedTables.java +++ b/core/src/main/java/com/netease/arctic/catalog/MixedTables.java @@ -1,6 +1,5 @@ package com.netease.arctic.catalog; -import com.netease.arctic.AmsClient; import com.netease.arctic.ams.api.CatalogMeta; import com.netease.arctic.ams.api.TableMeta; import com.netease.arctic.ams.api.properties.CatalogMetaProperties; @@ -32,6 +31,9 @@ import org.slf4j.LoggerFactory; import java.util.Map; +/** + * TODO: this class will be removed when we support using restCatalog as base store for InternalCatalog + */ public class MixedTables { private static final Logger LOG = LoggerFactory.getLogger(MixedTables.class); @@ -40,19 +42,8 @@ public class MixedTables { protected Tables tables; protected TableMetaStore tableMetaStore; - /** - * @deprecated since 0.5.0, will be removed in 0.6.0; - */ - @Deprecated - protected AmsClient amsClient; - public MixedTables(CatalogMeta catalogMeta) { - this(catalogMeta, null); - } - - public MixedTables(CatalogMeta catalogMeta, AmsClient amsClient) { initialize(catalogMeta); - this.amsClient = amsClient; } private void initialize(CatalogMeta meta) { @@ -94,13 +85,13 @@ protected KeyedTable loadKeyedTable(TableMeta tableMeta) { BaseTable baseTable = new BasicKeyedTable.BaseInternalTable(tableIdentifier, CatalogUtil.useArcticTableOperations( baseIcebergTable, baseLocation, fileIO, tableMetaStore.getConfiguration()), - fileIO, amsClient, catalogMeta.getCatalogProperties()); + fileIO, catalogMeta.getCatalogProperties()); Table changeIcebergTable = tableMetaStore.doAs(() -> tables.load(changeLocation)); ChangeTable changeTable = new BasicKeyedTable.ChangeInternalTable(tableIdentifier, CatalogUtil.useArcticTableOperations(changeIcebergTable, changeLocation, fileIO, tableMetaStore.getConfiguration()), - fileIO, amsClient, catalogMeta.getCatalogProperties()); + fileIO, catalogMeta.getCatalogProperties()); PrimaryKeySpec keySpec = buildPrimaryKeySpec(baseTable.schema(), tableMeta); return new BasicKeyedTable(tableLocation, keySpec, baseTable, changeTable); } @@ -133,12 +124,11 @@ protected UnkeyedTable loadUnKeyedTable(TableMeta tableMeta) { tableIdentifier, tableLocation, tableMeta.getProperties(), tableMetaStore, catalogMeta.getCatalogProperties()); return new BasicUnkeyedTable(tableIdentifier, CatalogUtil.useArcticTableOperations(table, baseLocation, - fileIO, tableMetaStore.getConfiguration()), fileIO, amsClient, catalogMeta.getCatalogProperties()); + fileIO, tableMetaStore.getConfiguration()), fileIO, catalogMeta.getCatalogProperties()); } - public ArcticTable createTableByMeta( - TableMeta tableMeta, Schema schema, PrimaryKeySpec primaryKeySpec, - PartitionSpec partitionSpec) { + public ArcticTable createTableByMeta(TableMeta tableMeta, Schema schema, PrimaryKeySpec primaryKeySpec, + PartitionSpec partitionSpec) { if (primaryKeySpec.primaryKeyExisted()) { return createKeyedTable(tableMeta, schema, primaryKeySpec, partitionSpec); } else { @@ -146,9 +136,8 @@ public ArcticTable createTableByMeta( } } - protected KeyedTable createKeyedTable( - TableMeta tableMeta, Schema schema, PrimaryKeySpec primaryKeySpec, - PartitionSpec partitionSpec) { + protected KeyedTable createKeyedTable(TableMeta tableMeta, Schema schema, PrimaryKeySpec primaryKeySpec, + PartitionSpec partitionSpec) { TableIdentifier tableIdentifier = TableIdentifier.of(tableMeta.getTableIdentifier()); String tableLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_TABLE); String baseLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_BASE); @@ -168,7 +157,7 @@ protected KeyedTable createKeyedTable( BaseTable baseTable = new BasicKeyedTable.BaseInternalTable(tableIdentifier, CatalogUtil.useArcticTableOperations(baseIcebergTable, baseLocation, fileIO, tableMetaStore.getConfiguration()), - fileIO, amsClient, catalogMeta.getCatalogProperties()); + fileIO, catalogMeta.getCatalogProperties()); Table changeIcebergTable = tableMetaStore.doAs(() -> { try { @@ -180,7 +169,7 @@ protected KeyedTable createKeyedTable( ChangeTable changeTable = new BasicKeyedTable.ChangeInternalTable(tableIdentifier, CatalogUtil.useArcticTableOperations(changeIcebergTable, changeLocation, fileIO, tableMetaStore.getConfiguration()), - fileIO, amsClient, catalogMeta.getCatalogProperties()); + fileIO, catalogMeta.getCatalogProperties()); return new BasicKeyedTable(tableLocation, primaryKeySpec, baseTable, changeTable); } @@ -191,9 +180,8 @@ protected void fillTableProperties(TableMeta tableMeta) { tableMeta.putToProperties("flink.max-continuous-empty-commits", String.valueOf(Integer.MAX_VALUE)); } - protected UnkeyedTable createUnKeyedTable( - TableMeta tableMeta, Schema schema, PrimaryKeySpec primaryKeySpec, - PartitionSpec partitionSpec) { + protected UnkeyedTable createUnKeyedTable(TableMeta tableMeta, Schema schema, PrimaryKeySpec primaryKeySpec, + PartitionSpec partitionSpec) { TableIdentifier tableIdentifier = TableIdentifier.of(tableMeta.getTableIdentifier()); String tableLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_TABLE); String baseLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_BASE); @@ -210,7 +198,7 @@ protected UnkeyedTable createUnKeyedTable( tableIdentifier, tableLocation, tableMeta.getProperties(), tableMetaStore, catalogMeta.getCatalogProperties()); return new BasicUnkeyedTable(tableIdentifier, CatalogUtil.useArcticTableOperations(table, baseLocation, fileIO, - tableMetaStore.getConfiguration()), fileIO, amsClient, catalogMeta.getCatalogProperties()); + tableMetaStore.getConfiguration()), fileIO, catalogMeta.getCatalogProperties()); } public void dropTableByMeta(TableMeta tableMeta, boolean purge) { diff --git a/core/src/main/java/com/netease/arctic/mixed/BasicMixedIcebergCatalog.java b/core/src/main/java/com/netease/arctic/mixed/BasicMixedIcebergCatalog.java index f4e56739bf..a69ad0ae55 100644 --- a/core/src/main/java/com/netease/arctic/mixed/BasicMixedIcebergCatalog.java +++ b/core/src/main/java/com/netease/arctic/mixed/BasicMixedIcebergCatalog.java @@ -24,6 +24,7 @@ import com.netease.arctic.catalog.ArcticCatalog; import com.netease.arctic.io.ArcticFileIO; import com.netease.arctic.io.TableTrashManagers; +import com.netease.arctic.op.CreateTableTransaction; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.PrimaryKeySpec; import com.netease.arctic.table.TableBuilder; @@ -47,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.thrift.TException; + import java.util.List; import java.util.Map; import java.util.Set; @@ -304,8 +306,15 @@ public ArcticTable create() { } @Override - public Transaction newCreateTableTransaction() { - return null; + public Transaction createTransaction() { + Transaction transaction = icebergCatalog.newCreateTableTransaction( + org.apache.iceberg.catalog.TableIdentifier.of(identifier.getDatabase(), identifier.getTableName()), + schema, partitionSpec, properties); + return new CreateTableTransaction( + transaction, + this::create, + () -> dropTable(identifier, true) + ); } } } diff --git a/core/src/main/java/com/netease/arctic/mixed/MixedTables.java b/core/src/main/java/com/netease/arctic/mixed/MixedTables.java index 5cae313708..65d0055a9f 100644 --- a/core/src/main/java/com/netease/arctic/mixed/MixedTables.java +++ b/core/src/main/java/com/netease/arctic/mixed/MixedTables.java @@ -17,7 +17,7 @@ */ package com.netease.arctic.mixed; - + import com.netease.arctic.ams.api.CatalogMeta; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.ams.api.properties.CatalogMetaProperties; @@ -99,13 +99,13 @@ public ArcticTable loadTable(Table base, com.netease.arctic.table.TableIdentifie ArcticFileIO io = ArcticFileIOs.buildAdaptIcebergFileIO(this.tableMetaStore, base.io()); PrimaryKeySpec keySpec = getPrimaryKeySpec(base); if (!keySpec.primaryKeyExisted()) { - return new BasicUnkeyedTable(tableIdentifier, base, io, null, catalogMeta.getCatalogProperties()); + return new BasicUnkeyedTable(tableIdentifier, base, io, catalogMeta.getCatalogProperties()); } Table changeIcebergTable = loadChangeStore(base); BaseTable baseStore = new BasicKeyedTable.BaseInternalTable( - tableIdentifier, base, io, null, catalogMeta.getCatalogProperties()); + tableIdentifier, base, io, catalogMeta.getCatalogProperties()); ChangeTable changeStore = new BasicKeyedTable.ChangeInternalTable( - tableIdentifier, changeIcebergTable, io, null, catalogMeta.getCatalogProperties()); + tableIdentifier, changeIcebergTable, io, catalogMeta.getCatalogProperties()); return new BasicKeyedTable(keySpec, baseStore, changeStore); } @@ -132,7 +132,7 @@ public ArcticTable createTable( ArcticFileIO io = ArcticFileIOs.buildAdaptIcebergFileIO(this.tableMetaStore, base.io()); if (!keySpec.primaryKeyExisted()) { - return new BasicUnkeyedTable(identifier, base, io, null, catalogMeta.getCatalogProperties()); + return new BasicUnkeyedTable(identifier, base, io, catalogMeta.getCatalogProperties()); } Catalog.TableBuilder changeBuilder = icebergCatalog.buildTable(changeIdentifier, schema) @@ -141,9 +141,9 @@ public ArcticTable createTable( .withProperty(TableProperties.MIXED_FORMAT_TABLE_STORE, TableProperties.MIXED_FORMAT_TABLE_STORE_CHANGE); Table change = tableMetaStore.doAs(changeBuilder::create); BaseTable baseStore = new BasicKeyedTable.BaseInternalTable( - identifier, base, io, null, catalogMeta.getCatalogProperties()); + identifier, base, io, catalogMeta.getCatalogProperties()); ChangeTable changeStore = new BasicKeyedTable.ChangeInternalTable( - identifier, change, io, null, catalogMeta.getCatalogProperties()); + identifier, change, io, catalogMeta.getCatalogProperties()); return new BasicKeyedTable(keySpec, baseStore, changeStore); } diff --git a/core/src/main/java/com/netease/arctic/trace/ArcticAppendFiles.java b/core/src/main/java/com/netease/arctic/op/ArcticAppendFiles.java similarity index 74% rename from core/src/main/java/com/netease/arctic/trace/ArcticAppendFiles.java rename to core/src/main/java/com/netease/arctic/op/ArcticAppendFiles.java index f88519f975..9dd5c43892 100644 --- a/core/src/main/java/com/netease/arctic/trace/ArcticAppendFiles.java +++ b/core/src/main/java/com/netease/arctic/op/ArcticAppendFiles.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,18 +16,14 @@ * limitations under the License. */ -package com.netease.arctic.trace; +package com.netease.arctic.op; -import com.netease.arctic.AmsClient; -import com.netease.arctic.op.ArcticUpdate; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.UnkeyedTable; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; - import java.util.function.Supplier; /** @@ -41,14 +37,15 @@ public static Builder buildFor(ArcticTable table, boolean fastAppend) { return new Builder(table, fastAppend); } - private ArcticAppendFiles(ArcticTable arcticTable, AppendFiles appendFiles, TableTracer tracer) { - super(arcticTable, appendFiles, tracer); + private ArcticAppendFiles(ArcticTable arcticTable, AppendFiles appendFiles) { + super(arcticTable, appendFiles); this.appendFiles = appendFiles; } - private ArcticAppendFiles(ArcticTable arcticTable, AppendFiles appendFiles, TableTracer tracer, + private ArcticAppendFiles( + ArcticTable arcticTable, AppendFiles appendFiles, Transaction transaction, boolean autoCommitTransaction) { - super(arcticTable, appendFiles, tracer, transaction, autoCommitTransaction); + super(arcticTable, appendFiles, transaction, autoCommitTransaction); this.appendFiles = appendFiles; } @@ -82,9 +79,8 @@ private Builder(ArcticTable table, boolean fastAppend) { } @Override - protected ArcticAppendFiles updateWithWatermark( - TableTracer tableTracer, Transaction transaction, boolean autoCommitTransaction) { - return new ArcticAppendFiles(table, newAppendFiles(transaction), tableTracer, transaction, autoCommitTransaction); + protected ArcticAppendFiles updateWithWatermark(Transaction transaction, boolean autoCommitTransaction) { + return new ArcticAppendFiles(table, newAppendFiles(transaction), transaction, autoCommitTransaction); } @Override @@ -98,18 +94,8 @@ protected Supplier tableStoreDelegateSupplier(Table tableStore) { } @Override - protected ArcticAppendFiles updateWithoutWatermark( - TableTracer tableTracer, Supplier delegateSupplier) { - return new ArcticAppendFiles(table, delegateSupplier.get(), tableTracer); - } - - @Override - public ArcticUpdate.Builder traceTable(AmsClient client, UnkeyedTable traceTable) { - if (client != null) { - TableTracer tracer = new AmsTableTracer(traceTable, TraceOperations.APPEND, client, true); - traceTable(tracer); - } - return this; + protected ArcticAppendFiles updateWithoutWatermark(Supplier delegateSupplier) { + return new ArcticAppendFiles(table, delegateSupplier.get()); } private AppendFiles newAppendFiles(Transaction transaction) { diff --git a/core/src/main/java/com/netease/arctic/trace/ArcticDeleteFiles.java b/core/src/main/java/com/netease/arctic/op/ArcticDeleteFiles.java similarity index 77% rename from core/src/main/java/com/netease/arctic/trace/ArcticDeleteFiles.java rename to core/src/main/java/com/netease/arctic/op/ArcticDeleteFiles.java index d924a36d40..049902b466 100644 --- a/core/src/main/java/com/netease/arctic/trace/ArcticDeleteFiles.java +++ b/core/src/main/java/com/netease/arctic/op/ArcticDeleteFiles.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,23 +16,16 @@ * limitations under the License. */ -package com.netease.arctic.trace; +package com.netease.arctic.op; -import com.netease.arctic.AmsClient; -import com.netease.arctic.op.ArcticUpdate; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.UnkeyedTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.expressions.Expression; - import java.util.function.Supplier; -/** - * Wrap {@link DeleteFiles} with {@link TableTracer}. - */ public class ArcticDeleteFiles extends ArcticUpdate implements DeleteFiles { private final DeleteFiles deleteFiles; @@ -41,8 +34,8 @@ public static Builder buildFor(ArcticTable table) { return new Builder(table); } - protected ArcticDeleteFiles(ArcticTable table, DeleteFiles deleteFiles, TableTracer tracer) { - super(table, deleteFiles, tracer); + protected ArcticDeleteFiles(ArcticTable table, DeleteFiles deleteFiles) { + super(table, deleteFiles); this.deleteFiles = deleteFiles; } @@ -81,27 +74,16 @@ protected Builder(ArcticTable table) { super(table); } - @Override - public ArcticUpdate.Builder traceTable(AmsClient client, UnkeyedTable traceTable) { - if (client != null) { - TableTracer tracer = new AmsTableTracer(traceTable, TraceOperations.DELETE, client, true); - traceTable(tracer); - } - return this; - } - @Override protected ArcticDeleteFiles updateWithWatermark( - TableTracer tableTracer, Transaction transaction, boolean autoCommitTransaction) { - return new ArcticDeleteFiles(table, transaction.newDelete(), tableTracer); + return new ArcticDeleteFiles(table, transaction.newDelete()); } @Override - protected ArcticDeleteFiles updateWithoutWatermark( - TableTracer tableTracer, Supplier delegateSupplier) { - return new ArcticDeleteFiles(table, delegateSupplier.get(), tableTracer); + protected ArcticDeleteFiles updateWithoutWatermark(Supplier delegateSupplier) { + return new ArcticDeleteFiles(table, delegateSupplier.get()); } @Override @@ -113,6 +95,5 @@ protected Supplier transactionDelegateSupplier(Transaction transact protected Supplier tableStoreDelegateSupplier(Table tableStore) { return tableStore::newDelete; } - } } diff --git a/core/src/main/java/com/netease/arctic/trace/ArcticOverwriteFiles.java b/core/src/main/java/com/netease/arctic/op/ArcticOverwriteFiles.java similarity index 75% rename from core/src/main/java/com/netease/arctic/trace/ArcticOverwriteFiles.java rename to core/src/main/java/com/netease/arctic/op/ArcticOverwriteFiles.java index 4882bee6b5..2f38c6d794 100644 --- a/core/src/main/java/com/netease/arctic/trace/ArcticOverwriteFiles.java +++ b/core/src/main/java/com/netease/arctic/op/ArcticOverwriteFiles.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,23 +16,16 @@ * limitations under the License. */ -package com.netease.arctic.trace; +package com.netease.arctic.op; -import com.netease.arctic.AmsClient; -import com.netease.arctic.op.ArcticUpdate; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.UnkeyedTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.expressions.Expression; - import java.util.function.Supplier; -/** - * Wrap {@link OverwriteFiles} with {@link TableTracer}. - */ public class ArcticOverwriteFiles extends ArcticUpdate implements OverwriteFiles { private final OverwriteFiles overwriteFiles; @@ -41,14 +34,15 @@ public static ArcticOverwriteFiles.Builder buildFor(ArcticTable table) { return new ArcticOverwriteFiles.Builder(table); } - private ArcticOverwriteFiles(ArcticTable arcticTable, OverwriteFiles overwriteFiles, TableTracer tracer) { - super(arcticTable, overwriteFiles, tracer); + private ArcticOverwriteFiles(ArcticTable arcticTable, OverwriteFiles overwriteFiles) { + super(arcticTable, overwriteFiles); this.overwriteFiles = overwriteFiles; } - private ArcticOverwriteFiles(ArcticTable arcticTable, OverwriteFiles overwriteFiles, TableTracer tracer, + private ArcticOverwriteFiles( + ArcticTable arcticTable, OverwriteFiles overwriteFiles, Transaction transaction, boolean autoCommitTransaction) { - super(arcticTable, overwriteFiles, tracer, transaction, autoCommitTransaction); + super(arcticTable, overwriteFiles, transaction, autoCommitTransaction); this.overwriteFiles = overwriteFiles; } @@ -121,26 +115,13 @@ private Builder(ArcticTable table) { } @Override - public ArcticUpdate.Builder traceTable( - AmsClient client, UnkeyedTable traceTable) { - if (client != null) { - TableTracer tracer = new AmsTableTracer(traceTable, TraceOperations.OVERWRITE, client, true); - traceTable(tracer); - } - return this; - } - - @Override - protected ArcticOverwriteFiles updateWithWatermark( - TableTracer tableTracer, Transaction transaction, boolean autoCommitTransaction) { - return new ArcticOverwriteFiles(table, transaction.newOverwrite(), - tableTracer, transaction, autoCommitTransaction); + protected ArcticOverwriteFiles updateWithWatermark(Transaction transaction, boolean autoCommitTransaction) { + return new ArcticOverwriteFiles(table, transaction.newOverwrite(), transaction, autoCommitTransaction); } @Override - protected ArcticOverwriteFiles updateWithoutWatermark( - TableTracer tableTracer, Supplier delegateSupplier) { - return new ArcticOverwriteFiles(table, delegateSupplier.get(), tableTracer); + protected ArcticOverwriteFiles updateWithoutWatermark(Supplier delegateSupplier) { + return new ArcticOverwriteFiles(table, delegateSupplier.get()); } @Override diff --git a/core/src/main/java/com/netease/arctic/trace/ArcticReplacePartitions.java b/core/src/main/java/com/netease/arctic/op/ArcticReplacePartitions.java similarity index 74% rename from core/src/main/java/com/netease/arctic/trace/ArcticReplacePartitions.java rename to core/src/main/java/com/netease/arctic/op/ArcticReplacePartitions.java index a1e814e53f..814a536151 100644 --- a/core/src/main/java/com/netease/arctic/trace/ArcticReplacePartitions.java +++ b/core/src/main/java/com/netease/arctic/op/ArcticReplacePartitions.java @@ -16,17 +16,13 @@ * limitations under the License. */ -package com.netease.arctic.trace; +package com.netease.arctic.op; -import com.netease.arctic.AmsClient; -import com.netease.arctic.op.ArcticUpdate; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.UnkeyedTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; - import java.util.function.Supplier; public class ArcticReplacePartitions extends ArcticUpdate implements ReplacePartitions { @@ -37,14 +33,15 @@ public static ArcticReplacePartitions.Builder buildFor(ArcticTable table) { return new ArcticReplacePartitions.Builder(table); } - private ArcticReplacePartitions(ArcticTable arcticTable, ReplacePartitions replacePartitions, TableTracer tracer) { - super(arcticTable, replacePartitions, tracer); + private ArcticReplacePartitions(ArcticTable arcticTable, ReplacePartitions replacePartitions) { + super(arcticTable, replacePartitions); this.replacePartitions = replacePartitions; } - private ArcticReplacePartitions(ArcticTable arcticTable, ReplacePartitions replacePartitions, TableTracer tracer, + private ArcticReplacePartitions( + ArcticTable arcticTable, ReplacePartitions replacePartitions, Transaction transaction, boolean autoCommitTransaction) { - super(arcticTable, replacePartitions, tracer, transaction, autoCommitTransaction); + super(arcticTable, replacePartitions, transaction, autoCommitTransaction); this.replacePartitions = replacePartitions; } @@ -79,7 +76,6 @@ public ReplacePartitions validateNoConflictingData() { return this; } - @Override protected ReplacePartitions self() { return this; @@ -93,27 +89,14 @@ private Builder(ArcticTable table) { } @Override - public ArcticUpdate.Builder traceTable( - AmsClient client, UnkeyedTable traceTable) { - if (client != null) { - TableTracer tracer = new AmsTableTracer(traceTable, TraceOperations.OVERWRITE, client, true); - traceTable(tracer); - } - return this; - } - - @Override - protected ArcticReplacePartitions updateWithWatermark( - TableTracer tableTracer, Transaction transaction, boolean autoCommitTransaction) { - return new ArcticReplacePartitions(table, transaction.newReplacePartitions(), - tableTracer, transaction, autoCommitTransaction); + protected ArcticReplacePartitions updateWithWatermark(Transaction transaction, boolean autoCommitTransaction) { + return new ArcticReplacePartitions(table, transaction.newReplacePartitions(), transaction, autoCommitTransaction); } @Override protected ArcticReplacePartitions updateWithoutWatermark( - TableTracer tableTracer, Supplier delegateSupplier) { - return new ArcticReplacePartitions(table, delegateSupplier.get(), tableTracer); + return new ArcticReplacePartitions(table, delegateSupplier.get()); } @Override diff --git a/core/src/main/java/com/netease/arctic/trace/ArcticRewriteFiles.java b/core/src/main/java/com/netease/arctic/op/ArcticRewriteFiles.java similarity index 80% rename from core/src/main/java/com/netease/arctic/trace/ArcticRewriteFiles.java rename to core/src/main/java/com/netease/arctic/op/ArcticRewriteFiles.java index 43dbd9b797..702d0fd741 100644 --- a/core/src/main/java/com/netease/arctic/trace/ArcticRewriteFiles.java +++ b/core/src/main/java/com/netease/arctic/op/ArcticRewriteFiles.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,24 +16,17 @@ * limitations under the License. */ -package com.netease.arctic.trace; +package com.netease.arctic.op; -import com.netease.arctic.AmsClient; -import com.netease.arctic.op.ArcticUpdate; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.UnkeyedTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; - import java.util.Set; import java.util.function.Supplier; -/** - * Wrap {@link RewriteFiles} with {@link TableTracer}. - */ public class ArcticRewriteFiles extends ArcticUpdate implements RewriteFiles { private final RewriteFiles rewriteFiles; @@ -41,8 +34,8 @@ public static Builder buildFor(ArcticTable table) { return new Builder(table); } - protected ArcticRewriteFiles(ArcticTable table, RewriteFiles rewriteFiles, TableTracer tracer) { - super(table, rewriteFiles, tracer); + protected ArcticRewriteFiles(ArcticTable table, RewriteFiles rewriteFiles) { + super(table, rewriteFiles); this.rewriteFiles = rewriteFiles; } @@ -91,28 +84,16 @@ private Builder(ArcticTable table) { super(table); } - @Override - public ArcticUpdate.Builder traceTable( - AmsClient client, UnkeyedTable traceTable) { - if (client != null) { - TableTracer tracer = new AmsTableTracer(traceTable, TraceOperations.REPLACE, client, true); - traceTable(tracer); - } - return this; - } - @Override protected ArcticRewriteFiles updateWithWatermark( - TableTracer tableTracer, Transaction transaction, boolean autoCommitTransaction) { - return new ArcticRewriteFiles(table, transaction.newRewrite(), tableTracer); + return new ArcticRewriteFiles(table, transaction.newRewrite()); } @Override - protected ArcticRewriteFiles updateWithoutWatermark( - TableTracer tableTracer, Supplier delegateSupplier) { - return new ArcticRewriteFiles(table, delegateSupplier.get(), tableTracer); + protected ArcticRewriteFiles updateWithoutWatermark(Supplier delegateSupplier) { + return new ArcticRewriteFiles(table, delegateSupplier.get()); } @Override @@ -124,6 +105,5 @@ protected Supplier transactionDelegateSupplier(Transaction transac protected Supplier tableStoreDelegateSupplier(Table tableStore) { return tableStore::newRewrite; } - } } diff --git a/core/src/main/java/com/netease/arctic/trace/ArcticRowDelta.java b/core/src/main/java/com/netease/arctic/op/ArcticRowDelta.java similarity index 76% rename from core/src/main/java/com/netease/arctic/trace/ArcticRowDelta.java rename to core/src/main/java/com/netease/arctic/op/ArcticRowDelta.java index f458c1bb3e..64d3359b4e 100644 --- a/core/src/main/java/com/netease/arctic/trace/ArcticRowDelta.java +++ b/core/src/main/java/com/netease/arctic/op/ArcticRowDelta.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,25 +16,18 @@ * limitations under the License. */ -package com.netease.arctic.trace; +package com.netease.arctic.op; -import com.netease.arctic.AmsClient; -import com.netease.arctic.op.ArcticUpdate; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.UnkeyedTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.expressions.Expression; - import java.util.function.Supplier; -/** - * Wrap {@link RowDelta} with {@link TableTracer}. - */ public class ArcticRowDelta extends ArcticUpdate implements RowDelta { private final RowDelta rowDelta; @@ -43,14 +36,14 @@ public static ArcticRowDelta.Builder buildFor(ArcticTable table) { return new ArcticRowDelta.Builder(table); } - private ArcticRowDelta(ArcticTable arcticTable, RowDelta rowDelta, TableTracer tracer) { - super(arcticTable, rowDelta, tracer); + private ArcticRowDelta(ArcticTable arcticTable, RowDelta rowDelta) { + super(arcticTable, rowDelta); this.rowDelta = rowDelta; } - private ArcticRowDelta(ArcticTable arcticTable, RowDelta rowDelta, TableTracer tracer, + private ArcticRowDelta(ArcticTable arcticTable, RowDelta rowDelta, Transaction transaction, boolean autoCommitTransaction) { - super(arcticTable, rowDelta, tracer, transaction, autoCommitTransaction); + super(arcticTable, rowDelta, transaction, autoCommitTransaction); this.rowDelta = rowDelta; } @@ -122,26 +115,15 @@ private Builder(ArcticTable table) { generateWatermark(); } - @Override - public ArcticUpdate.Builder traceTable( - AmsClient client, UnkeyedTable traceTable) { - if (client != null) { - TableTracer tracer = new AmsTableTracer(traceTable, TraceOperations.OVERWRITE, client, true); - traceTable(tracer); - } - return this; - } @Override - protected ArcticRowDelta updateWithWatermark( - TableTracer tableTracer, Transaction transaction, boolean autoCommitTransaction) { - return new ArcticRowDelta(table, transaction.newRowDelta(), - tableTracer, transaction, autoCommitTransaction); + protected ArcticRowDelta updateWithWatermark(Transaction transaction, boolean autoCommitTransaction) { + return new ArcticRowDelta(table, transaction.newRowDelta(), transaction, autoCommitTransaction); } @Override - protected ArcticRowDelta updateWithoutWatermark(TableTracer tableTracer, Supplier delegateSupplier) { - return new ArcticRowDelta(table, delegateSupplier.get(), tableTracer); + protected ArcticRowDelta updateWithoutWatermark(Supplier delegateSupplier) { + return new ArcticRowDelta(table, delegateSupplier.get()); } @Override diff --git a/core/src/main/java/com/netease/arctic/trace/ArcticTransaction.java b/core/src/main/java/com/netease/arctic/op/ArcticTransaction.java similarity index 70% rename from core/src/main/java/com/netease/arctic/trace/ArcticTransaction.java rename to core/src/main/java/com/netease/arctic/op/ArcticTransaction.java index ad5877cdc5..43d3013f8b 100644 --- a/core/src/main/java/com/netease/arctic/trace/ArcticTransaction.java +++ b/core/src/main/java/com/netease/arctic/op/ArcticTransaction.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,13 +16,16 @@ * limitations under the License. */ -package com.netease.arctic.trace; +package com.netease.arctic.op; +import com.netease.arctic.op.ArcticAppendFiles; +import com.netease.arctic.op.ArcticDeleteFiles; +import com.netease.arctic.op.ArcticOverwriteFiles; +import com.netease.arctic.op.ArcticReplacePartitions; +import com.netease.arctic.op.ArcticRewriteFiles; +import com.netease.arctic.op.ArcticRowDelta; import com.netease.arctic.table.ArcticTable; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataOperations; -import org.apache.iceberg.DeleteFile; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.HasTableOperations; @@ -51,28 +54,20 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; - import java.io.Serializable; import java.util.List; import java.util.Map; -import java.util.Optional; -/** - * Wrap {@link Transaction} with {@link TableTracer}. - */ public class ArcticTransaction implements Transaction { private final ArcticTable arcticTable; private final Transaction transaction; - private final AmsTableTracer tracer; private final Table transactionTable; - public ArcticTransaction(ArcticTable arcticTable, Transaction transaction, AmsTableTracer tracer) { + public ArcticTransaction(ArcticTable arcticTable, Transaction transaction) { this.arcticTable = arcticTable; this.transaction = transaction; - this.tracer = tracer; - this.transactionTable = new TransactionTable(); } @@ -83,13 +78,7 @@ public Table table() { @Override public UpdateSchema updateSchema() { - UpdateSchema updateSchema = transaction.updateSchema(); - if (tracer != null) { - tracer.setAction(TraceOperations.UPDATE_SCHEMA); - return new TracedSchemaUpdate(updateSchema, new TransactionTracker()); - } else { - return updateSchema; - } + return transaction.updateSchema(); } @Override @@ -99,13 +88,7 @@ public UpdatePartitionSpec updateSpec() { @Override public UpdateProperties updateProperties() { - UpdateProperties updateProperties = transaction.updateProperties(); - if (tracer != null) { - tracer.setAction(TraceOperations.UPDATE_PROPERTIES); - return new TracedUpdateProperties(updateProperties, new TransactionTracker()); - } else { - return updateProperties; - } + return transaction.updateProperties(); } @Override @@ -120,23 +103,17 @@ public UpdateLocation updateLocation() { @Override public AppendFiles newAppend() { - tableTracer().ifPresent(tracer -> tracer.setAction(DataOperations.APPEND)); - return ArcticAppendFiles.buildFor(arcticTable, false).inTransaction(transaction) - .traceTable(transactionTracer()).build(); + return ArcticAppendFiles.buildFor(arcticTable, false).inTransaction(transaction).build(); } @Override public AppendFiles newFastAppend() { - tableTracer().ifPresent(tracer -> tracer.setAction(DataOperations.APPEND)); - return ArcticAppendFiles.buildFor(arcticTable, true).inTransaction(transaction) - .traceTable(transactionTracer()).build(); + return ArcticAppendFiles.buildFor(arcticTable, true).inTransaction(transaction).build(); } @Override public RewriteFiles newRewrite() { - tableTracer().ifPresent(tracer -> tracer.setAction(DataOperations.REPLACE)); - return ArcticRewriteFiles.buildFor(arcticTable).inTransaction(transaction) - .traceTable(transactionTracer()).build(); + return ArcticRewriteFiles.buildFor(arcticTable).inTransaction(transaction).build(); } @Override @@ -146,30 +123,22 @@ public RewriteManifests rewriteManifests() { @Override public OverwriteFiles newOverwrite() { - tableTracer().ifPresent(tracer -> tracer.setAction(DataOperations.OVERWRITE)); - return ArcticOverwriteFiles.buildFor(arcticTable).inTransaction(transaction) - .traceTable(transactionTracer()).build(); + return ArcticOverwriteFiles.buildFor(arcticTable).inTransaction(transaction).build(); } @Override public RowDelta newRowDelta() { - tableTracer().ifPresent(tracer -> tracer.setAction(DataOperations.OVERWRITE)); - return ArcticRowDelta.buildFor(arcticTable).inTransaction(transaction) - .traceTable(transactionTracer()).build(); + return ArcticRowDelta.buildFor(arcticTable).inTransaction(transaction).build(); } @Override public ReplacePartitions newReplacePartitions() { - tableTracer().ifPresent(tracer -> tracer.setAction(DataOperations.OVERWRITE)); - return ArcticReplacePartitions.buildFor(arcticTable).inTransaction(transaction) - .traceTable(transactionTracer()).build(); + return ArcticReplacePartitions.buildFor(arcticTable).inTransaction(transaction).build(); } @Override public DeleteFiles newDelete() { - tableTracer().ifPresent(tracer -> tracer.setAction(DataOperations.DELETE)); - return ArcticDeleteFiles.buildFor(arcticTable).inTransaction(transaction) - .traceTable(transactionTracer()).build(); + return ArcticDeleteFiles.buildFor(arcticTable).inTransaction(transaction).build(); } @Override @@ -180,70 +149,6 @@ public ExpireSnapshots expireSnapshots() { @Override public void commitTransaction() { transaction.commitTransaction(); - tableTracer().ifPresent(AmsTableTracer::commit); - } - - private Optional tableTracer() { - return Optional.ofNullable(tracer); - } - - private TableTracer transactionTracer() { - if (tracer != null) { - return new TransactionTracker(); - } else { - return null; - } - } - - class TransactionTracker implements TableTracer { - - AmsTableTracer.InternalTableChange internalTableChange; - - public TransactionTracker() { - internalTableChange = new AmsTableTracer.InternalTableChange(); - } - - @Override - public void addDataFile(DataFile dataFile) { - internalTableChange.addDataFile(dataFile); - } - - @Override - public void deleteDataFile(DataFile dataFile) { - internalTableChange.deleteDataFile(dataFile); - } - - @Override - public void addDeleteFile(DeleteFile deleteFile) { - internalTableChange.addDeleteFile(deleteFile); - } - - @Override - public void deleteDeleteFile(DeleteFile deleteFile) { - internalTableChange.deleteDeleteFile(deleteFile); - } - - @Override - public void commit() { - if (transaction.table().currentSnapshot() != null) { - tracer.addTransactionTableSnapshot(transaction.table().currentSnapshot().snapshotId(), internalTableChange); - } - } - - @Override - public void replaceProperties(Map newProperties) { - tracer.replaceProperties(newProperties); - } - - @Override - public void setSnapshotSummary(String key, String value) { - tracer.setSnapshotSummary(key, value); - } - - @Override - public void updateColumn(UpdateColumn updateColumn) { - tracer.updateColumn(updateColumn); - } } class TransactionTable implements Table, HasTableOperations, Serializable { diff --git a/core/src/main/java/com/netease/arctic/op/ArcticUpdate.java b/core/src/main/java/com/netease/arctic/op/ArcticUpdate.java index 4d032fb449..6a8f9047ee 100644 --- a/core/src/main/java/com/netease/arctic/op/ArcticUpdate.java +++ b/core/src/main/java/com/netease/arctic/op/ArcticUpdate.java @@ -18,12 +18,9 @@ package com.netease.arctic.op; -import com.netease.arctic.AmsClient; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.TableProperties; -import com.netease.arctic.table.UnkeyedTable; import com.netease.arctic.table.WatermarkGenerator; -import com.netease.arctic.trace.TableTracer; import com.netease.arctic.utils.TablePropertyUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -34,8 +31,6 @@ import org.apache.iceberg.Transaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.function.Supplier; @@ -52,14 +47,12 @@ public abstract class ArcticUpdate implements SnapshotUpdate { protected final SnapshotUpdate delegate; private final ArcticTable arcticTable; - private final TableTracer tracer; protected final Transaction transaction; protected final boolean autoCommitTransaction; protected final WatermarkGenerator watermarkGenerator; - public ArcticUpdate(ArcticTable arcticTable, SnapshotUpdate delegate, TableTracer tracer) { + public ArcticUpdate(ArcticTable arcticTable, SnapshotUpdate delegate) { this.arcticTable = arcticTable; - this.tracer = tracer; this.transaction = null; this.autoCommitTransaction = false; this.watermarkGenerator = null; @@ -67,10 +60,9 @@ public ArcticUpdate(ArcticTable arcticTable, SnapshotUpdate delegate, TableTr } public ArcticUpdate( - ArcticTable arcticTable, SnapshotUpdate delegate, TableTracer tracer, Transaction transaction, + ArcticTable arcticTable, SnapshotUpdate delegate, Transaction transaction, boolean autoCommitTransaction) { this.arcticTable = arcticTable; - this.tracer = tracer; this.transaction = transaction; this.autoCommitTransaction = autoCommitTransaction; WatermarkGenerator watermarkGenerator = null; @@ -83,45 +75,25 @@ public ArcticUpdate( this.delegate = delegate; } - protected Optional tracer() { - if (tracer != null) { - return Optional.of(tracer); - } else { - return Optional.empty(); - } - } - protected void addIcebergDataFile(DataFile file) { - if (tracer != null) { - tracer.addDataFile(file); - } if (watermarkGenerator != null) { watermarkGenerator.addFile(file); } } protected void deleteIcebergDataFile(DataFile file) { - if (tracer != null) { - tracer.deleteDataFile(file); - } if (watermarkGenerator != null) { watermarkGenerator.addFile(file); } } protected void addIcebergDeleteFile(DeleteFile file) { - if (tracer != null) { - tracer.addDeleteFile(file); - } if (watermarkGenerator != null) { watermarkGenerator.addFile(file); } } protected void deleteIcebergDeleteFile(DeleteFile file) { - if (tracer != null) { - tracer.deleteDeleteFile(file); - } if (watermarkGenerator != null) { watermarkGenerator.addFile(file); } @@ -130,7 +102,6 @@ protected void deleteIcebergDeleteFile(DeleteFile file) { @Override public T set(String property, String value) { this.delegate.set(property, value); - tracer().ifPresent(tracer -> tracer.setSnapshotSummary(property, value)); return this.self(); } @@ -177,9 +148,6 @@ public void commit() { if (transaction != null && autoCommitTransaction) { transaction.commitTransaction(); } - if (tracer != null) { - tracer.commit(); - } } @Override @@ -192,7 +160,6 @@ public abstract static class Builder { protected final ArcticTable table; protected Table tableStore; - protected TableTracer tableTracer; protected boolean onChangeStore = false; protected Transaction insideTransaction; protected boolean generateWatermark = false; @@ -221,13 +188,6 @@ public Builder generateWatermark() { return this; } - public Builder traceTable(TableTracer tableTracer) { - this.tableTracer = tableTracer; - return this; - } - - public abstract Builder traceTable(AmsClient client, UnkeyedTable traceTable); - protected Table getTableStore() { if (tableStore == null) { if (table.isKeyedTable()) { @@ -247,25 +207,24 @@ public T build() { Table tableStore = getTableStore(); if (generateWatermark) { if (insideTransaction != null) { - return updateWithWatermark(tableTracer, insideTransaction, false); + return updateWithWatermark(insideTransaction, false); } else { Transaction transaction = tableStore.newTransaction(); - return updateWithWatermark(tableTracer, transaction, true); + return updateWithWatermark(transaction, true); } } else { if (insideTransaction != null) { - return updateWithoutWatermark(tableTracer, transactionDelegateSupplier(insideTransaction)); + return updateWithoutWatermark(transactionDelegateSupplier(insideTransaction)); } else { - return updateWithoutWatermark(tableTracer, tableStoreDelegateSupplier(tableStore)); + return updateWithoutWatermark(tableStoreDelegateSupplier(tableStore)); } } } - protected abstract T updateWithWatermark( - TableTracer tableTracer, Transaction transaction, + protected abstract T updateWithWatermark(Transaction transaction, boolean autoCommitTransaction); - protected abstract T updateWithoutWatermark(TableTracer tableTracer, Supplier delegateSupplier); + protected abstract T updateWithoutWatermark(Supplier delegateSupplier); protected abstract Supplier transactionDelegateSupplier(Transaction transaction); diff --git a/core/src/main/java/com/netease/arctic/op/CreateTableTransaction.java b/core/src/main/java/com/netease/arctic/op/CreateTableTransaction.java new file mode 100644 index 0000000000..3c68c854f5 --- /dev/null +++ b/core/src/main/java/com/netease/arctic/op/CreateTableTransaction.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * * + * http://www.apache.org/licenses/LICENSE-2.0 + * * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.op; + +import com.netease.arctic.table.ArcticTable; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.ReplaceSortOrder; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.RewriteManifests; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateLocation; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public class CreateTableTransaction implements Transaction { + private final List appendDataFiles = Lists.newArrayList(); + private final Supplier tableCreator; + private final Runnable rollback; + private final Transaction transaction; + + public CreateTableTransaction(Transaction transaction, Supplier tableSupplier, Runnable rollback) { + this.transaction = transaction; + this.tableCreator = tableSupplier; + this.rollback = rollback; + } + + @Override + public Table table() { + return transaction.table(); + } + + @Override + public UpdateSchema updateSchema() { + throw new UnsupportedOperationException("create table transaction unsupported updateSchema"); + } + + @Override + public UpdatePartitionSpec updateSpec() { + throw new UnsupportedOperationException("create table transaction unsupported updateSpec"); + } + + @Override + public UpdateProperties updateProperties() { + throw new UnsupportedOperationException("create table transaction unsupported updateProperties"); + } + + @Override + public ReplaceSortOrder replaceSortOrder() { + throw new UnsupportedOperationException("create table transaction unsupported updateProperties"); + } + + @Override + public UpdateLocation updateLocation() { + throw new UnsupportedOperationException("create table transaction unsupported updateProperties"); + } + + @Override + public AppendFiles newAppend() { + return new AppendFiles() { + private final List dataFiles = Lists.newArrayList(); + + @Override + public AppendFiles appendFile(DataFile dataFile) { + dataFiles.add(dataFile); + return this; + } + + @Override + public AppendFiles appendManifest(ManifestFile file) { + throw new UnsupportedOperationException("create table transaction AppendFiles unsupported ManifestFile"); + } + + @Override + public AppendFiles set(String property, String value) { + throw new UnsupportedOperationException("create table transaction AppendFiles unsupported set"); + } + + @Override + public AppendFiles deleteWith(Consumer deleteFunc) { + throw new UnsupportedOperationException("create table transaction AppendFiles unsupported deleteWith"); + } + + @Override + public AppendFiles stageOnly() { + throw new UnsupportedOperationException("create table transaction AppendFiles unsupported stageOnly"); + } + + @Override + public AppendFiles scanManifestsWith(ExecutorService executorService) { + throw new UnsupportedOperationException("create table transaction AppendFiles unsupported scanManifestsWith"); + } + + @Override + public Snapshot apply() { + throw new UnsupportedOperationException("create table transaction AppendFiles unsupported apply"); + } + + @Override + public void commit() { + appendDataFiles.addAll(this.dataFiles); + } + }; + } + + @Override + public RewriteFiles newRewrite() { + throw new UnsupportedOperationException("create table transaction unsupported newRewrite"); + } + + @Override + public RewriteManifests rewriteManifests() { + throw new UnsupportedOperationException("create table transaction unsupported rewriteManifests"); + } + + @Override + public OverwriteFiles newOverwrite() { + throw new UnsupportedOperationException("create table transaction unsupported newOverwrite"); + } + + @Override + public RowDelta newRowDelta() { + throw new UnsupportedOperationException("create table transaction unsupported newRowDelta"); + } + + @Override + public ReplacePartitions newReplacePartitions() { + throw new UnsupportedOperationException("create table transaction unsupported newReplacePartitions"); + } + + @Override + public DeleteFiles newDelete() { + throw new UnsupportedOperationException("create table transaction unsupported newReplacePartitions"); + } + + @Override + public ExpireSnapshots expireSnapshots() { + throw new UnsupportedOperationException("create table transaction unsupported newReplacePartitions"); + } + + @Override + public void commitTransaction() { + ArcticTable table = tableCreator.get(); + try { + Transaction tx; + if (table.isUnkeyedTable()) { + tx = table.asUnkeyedTable().newTransaction(); + } else { + tx = table.asKeyedTable().baseTable().newTransaction(); + } + + AppendFiles appendFiles = tx.newAppend(); + appendDataFiles.forEach(appendFiles::appendFile); + appendFiles.commit(); + tx.commitTransaction(); + } catch (Throwable t) { + rollback.run(); + throw t; + } + } +} diff --git a/core/src/main/java/com/netease/arctic/trace/SnapshotSummary.java b/core/src/main/java/com/netease/arctic/op/SnapshotSummary.java similarity index 96% rename from core/src/main/java/com/netease/arctic/trace/SnapshotSummary.java rename to core/src/main/java/com/netease/arctic/op/SnapshotSummary.java index 4127599a50..a91c4f8d7a 100644 --- a/core/src/main/java/com/netease/arctic/trace/SnapshotSummary.java +++ b/core/src/main/java/com/netease/arctic/op/SnapshotSummary.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.trace; +package com.netease.arctic.op; import com.netease.arctic.ams.api.CommitMetaProducer; diff --git a/core/src/main/java/com/netease/arctic/table/BasicKeyedTable.java b/core/src/main/java/com/netease/arctic/table/BasicKeyedTable.java index 8056363042..0137b847ab 100644 --- a/core/src/main/java/com/netease/arctic/table/BasicKeyedTable.java +++ b/core/src/main/java/com/netease/arctic/table/BasicKeyedTable.java @@ -18,18 +18,17 @@ package com.netease.arctic.table; -import com.netease.arctic.AmsClient; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.io.ArcticFileIO; import com.netease.arctic.op.KeyedPartitionRewrite; import com.netease.arctic.op.KeyedSchemaUpdate; import com.netease.arctic.op.OverwriteBaseFiles; import com.netease.arctic.op.RewritePartitions; +import com.netease.arctic.op.SnapshotSummary; import com.netease.arctic.op.UpdateKeyedTableProperties; import com.netease.arctic.scan.BasicKeyedTableScan; import com.netease.arctic.scan.ChangeTableIncrementalScan; import com.netease.arctic.scan.KeyedTableScan; -import com.netease.arctic.trace.SnapshotSummary; import com.netease.arctic.utils.TablePropertyUtil; import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.AppendFiles; @@ -202,8 +201,8 @@ public static class BaseInternalTable extends BasicUnkeyedTable implements BaseT public BaseInternalTable( TableIdentifier tableIdentifier, Table baseIcebergTable, ArcticFileIO arcticFileIO, - AmsClient client, Map catalogProperties) { - super(tableIdentifier, baseIcebergTable, arcticFileIO, client, catalogProperties); + Map catalogProperties) { + super(tableIdentifier, baseIcebergTable, arcticFileIO, catalogProperties); } } @@ -211,8 +210,8 @@ public static class ChangeInternalTable extends BasicUnkeyedTable implements Cha public ChangeInternalTable( TableIdentifier tableIdentifier, Table changeIcebergTable, ArcticFileIO arcticFileIO, - AmsClient client, Map catalogProperties) { - super(tableIdentifier, changeIcebergTable, arcticFileIO, client, catalogProperties); + Map catalogProperties) { + super(tableIdentifier, changeIcebergTable, arcticFileIO, catalogProperties); } @Override diff --git a/core/src/main/java/com/netease/arctic/table/BasicTableBuilder.java b/core/src/main/java/com/netease/arctic/table/BasicTableBuilder.java index e4476d9a44..a00cee3095 100644 --- a/core/src/main/java/com/netease/arctic/table/BasicTableBuilder.java +++ b/core/src/main/java/com/netease/arctic/table/BasicTableBuilder.java @@ -22,7 +22,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; -import org.apache.iceberg.Transaction; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -82,10 +81,5 @@ public TableBuilder withPrimaryKeySpec(PrimaryKeySpec primaryKeySpec) { return self(); } - @Override - public Transaction newCreateTableTransaction() { - throw new UnsupportedOperationException("do not support create table transactional."); - } - protected abstract ThisT self(); } diff --git a/core/src/main/java/com/netease/arctic/table/BasicUnkeyedTable.java b/core/src/main/java/com/netease/arctic/table/BasicUnkeyedTable.java index dc02c0c7e4..75f9ecda7b 100644 --- a/core/src/main/java/com/netease/arctic/table/BasicUnkeyedTable.java +++ b/core/src/main/java/com/netease/arctic/table/BasicUnkeyedTable.java @@ -18,22 +18,17 @@ package com.netease.arctic.table; -import com.netease.arctic.AmsClient; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.io.ArcticFileIO; +import com.netease.arctic.op.ArcticAppendFiles; +import com.netease.arctic.op.ArcticDeleteFiles; +import com.netease.arctic.op.ArcticOverwriteFiles; +import com.netease.arctic.op.ArcticReplacePartitions; +import com.netease.arctic.op.ArcticRewriteFiles; +import com.netease.arctic.op.ArcticRowDelta; +import com.netease.arctic.op.ArcticTransaction; import com.netease.arctic.op.PartitionPropertiesUpdate; import com.netease.arctic.op.UpdatePartitionProperties; -import com.netease.arctic.trace.AmsTableTracer; -import com.netease.arctic.trace.ArcticAppendFiles; -import com.netease.arctic.trace.ArcticDeleteFiles; -import com.netease.arctic.trace.ArcticOverwriteFiles; -import com.netease.arctic.trace.ArcticReplacePartitions; -import com.netease.arctic.trace.ArcticRewriteFiles; -import com.netease.arctic.trace.ArcticRowDelta; -import com.netease.arctic.trace.ArcticTransaction; -import com.netease.arctic.trace.TraceOperations; -import com.netease.arctic.trace.TracedSchemaUpdate; -import com.netease.arctic.trace.TracedUpdateProperties; import com.netease.arctic.utils.CatalogUtil; import com.netease.arctic.utils.TablePropertyUtil; import org.apache.iceberg.AppendFiles; @@ -66,7 +61,6 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.util.StructLikeMap; - import java.util.List; import java.util.Map; @@ -80,19 +74,12 @@ public class BasicUnkeyedTable implements UnkeyedTable, HasTableOperations { protected final Table icebergTable; protected final ArcticFileIO arcticFileIO; - /** - * @deprecated since 0.5.0, will be removed in 0.6.0; - */ - @Deprecated - private final AmsClient client; - public BasicUnkeyedTable( TableIdentifier tableIdentifier, Table icebergTable, ArcticFileIO arcticFileIO, - AmsClient client, Map catalogProperties) { + Map catalogProperties) { this.tableIdentifier = tableIdentifier; this.icebergTable = icebergTable; this.arcticFileIO = arcticFileIO; - this.client = client; this.catalogProperties = catalogProperties; } @@ -187,12 +174,7 @@ public List history() { @Override public UpdateSchema updateSchema() { - if (client != null) { - return new TracedSchemaUpdate(icebergTable.updateSchema(), - new AmsTableTracer(this, TraceOperations.UPDATE_SCHEMA, client, false)); - } else { - return icebergTable.updateSchema(); - } + return icebergTable.updateSchema(); } @Override @@ -202,13 +184,7 @@ public UpdatePartitionSpec updateSpec() { @Override public UpdateProperties updateProperties() { - UpdateProperties updateProperties = icebergTable.updateProperties(); - if (client != null) { - AmsTableTracer tracer = new AmsTableTracer(this, TraceOperations.UPDATE_PROPERTIES, client, false); - return new TracedUpdateProperties(updateProperties, tracer); - } else { - return updateProperties; - } + return icebergTable.updateProperties(); } @Override @@ -224,7 +200,6 @@ public UpdateLocation updateLocation() { @Override public AppendFiles newAppend() { return ArcticAppendFiles.buildFor(this, false) - .traceTable(client, this) .onTableStore(icebergTable) .build(); } @@ -232,7 +207,6 @@ public AppendFiles newAppend() { @Override public AppendFiles newFastAppend() { return ArcticAppendFiles.buildFor(this, true) - .traceTable(client, this) .onTableStore(icebergTable) .build(); } @@ -240,7 +214,6 @@ public AppendFiles newFastAppend() { @Override public RewriteFiles newRewrite() { return ArcticRewriteFiles.buildFor(this) - .traceTable(client, this) .onTableStore(icebergTable) .build(); } @@ -253,25 +226,24 @@ public RewriteManifests rewriteManifests() { @Override public OverwriteFiles newOverwrite() { return ArcticOverwriteFiles.buildFor(this) - .traceTable(client, this).onTableStore(icebergTable).build(); + .onTableStore(icebergTable).build(); } @Override public RowDelta newRowDelta() { return ArcticRowDelta.buildFor(this) - .traceTable(client, this).onTableStore(icebergTable).build(); + .onTableStore(icebergTable).build(); } @Override public ReplacePartitions newReplacePartitions() { return ArcticReplacePartitions.buildFor(this) - .traceTable(client, this).onTableStore(icebergTable).build(); + .onTableStore(icebergTable).build(); } @Override public DeleteFiles newDelete() { return ArcticDeleteFiles.buildFor(this) - .traceTable(client, this) .onTableStore(icebergTable) .build(); } @@ -289,11 +261,7 @@ public ManageSnapshots manageSnapshots() { @Override public Transaction newTransaction() { Transaction transaction = icebergTable.newTransaction(); - AmsTableTracer tableTracer = null; - if (client != null) { - tableTracer = new AmsTableTracer(this, client, false); - } - return new ArcticTransaction(this, transaction, tableTracer); + return new ArcticTransaction(this, transaction); } @Override diff --git a/core/src/main/java/com/netease/arctic/table/TableBuilder.java b/core/src/main/java/com/netease/arctic/table/TableBuilder.java index 6da9bb0c65..0ce362e9ad 100644 --- a/core/src/main/java/com/netease/arctic/table/TableBuilder.java +++ b/core/src/main/java/com/netease/arctic/table/TableBuilder.java @@ -79,5 +79,5 @@ public interface TableBuilder { /** * Create a transaction for create table; */ - Transaction newCreateTableTransaction(); + Transaction createTransaction(); } diff --git a/core/src/main/java/com/netease/arctic/trace/AmsTableTracer.java b/core/src/main/java/com/netease/arctic/trace/AmsTableTracer.java deleted file mode 100644 index 71afbf28db..0000000000 --- a/core/src/main/java/com/netease/arctic/trace/AmsTableTracer.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netease.arctic.trace; - -import com.netease.arctic.AmsClient; -import com.netease.arctic.ams.api.CommitMetaProducer; -import com.netease.arctic.ams.api.Constants; -import com.netease.arctic.ams.api.SchemaUpdateMeta; -import com.netease.arctic.ams.api.TableChange; -import com.netease.arctic.ams.api.TableCommitMeta; -import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.ChangeTable; -import com.netease.arctic.table.UnkeyedTable; -import com.netease.arctic.utils.CatalogUtil; -import com.netease.arctic.utils.ConvertStructUtil; -import com.netease.arctic.utils.SnapshotFileUtil; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotSummary; -import org.apache.iceberg.Table; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.PropertyUtil; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Implementation of {@link TableTracer}, trace table changes and report changes to ams when committing ignore errors. - */ -public class AmsTableTracer implements TableTracer { - - private static final Logger LOG = LoggerFactory.getLogger(AmsTableTracer.class); - - private final ArcticTable table; - private final String innerTable; - private final AmsClient client; - - private final Map snapshotSummary = new HashMap<>(); - private final Map transactionSnapshotTableChanges = new LinkedHashMap<>(); - private final List updateColumns = new ArrayList<>(); - - private String action; - private Map properties; - private InternalTableChange defaultTableChange; - - /** - * Construct a new AmsTableTracer - * - * @param table table object - * @param action Table change operation name - * @param client AMS client - * @param commitNewSnapshot Whether to submit a new snapshot(but not a transaction) - */ - public AmsTableTracer(UnkeyedTable table, String action, AmsClient client, boolean commitNewSnapshot) { - this.innerTable = table instanceof ChangeTable ? - Constants.INNER_TABLE_CHANGE : Constants.INNER_TABLE_BASE; - this.table = table; - this.client = client; - if (commitNewSnapshot) { - this.defaultTableChange = new InternalTableChange(); - } - setAction(action); - } - - public AmsTableTracer(UnkeyedTable table, AmsClient client, boolean commitNewSnapshot) { - this(table, null, client, commitNewSnapshot); - } - - @Override - public void addDataFile(DataFile dataFile) { - getDefaultChange().addDataFile(dataFile); - } - - @Override - public void deleteDataFile(DataFile dataFile) { - getDefaultChange().deleteDataFile(dataFile); - } - - @Override - public void addDeleteFile(DeleteFile deleteFile) { - getDefaultChange().addDeleteFile(deleteFile); - } - - @Override - public void deleteDeleteFile(DeleteFile deleteFile) { - getDefaultChange().deleteDeleteFile(deleteFile); - } - - private InternalTableChange getDefaultChange() { - if (defaultTableChange == null) { - throw new RuntimeException("This operation should not result in changes to files or snapshots"); - } - return defaultTableChange; - } - - public void addTransactionTableSnapshot(Long snapshotId, AmsTableTracer.InternalTableChange internalTableChange) { - transactionSnapshotTableChanges.putIfAbsent(snapshotId, internalTableChange); - } - - public ArcticTable table() { - return table; - } - - public String innerTable() { - return innerTable; - } - - @Override - public void commit() { - TableCommitMeta commitMeta = new TableCommitMeta(); - commitMeta.setTableIdentifier(table.id().buildTableIdentifier()); - commitMeta.setAction(action); - String commitMetaSource = PropertyUtil.propertyAsString( - snapshotSummary, - com.netease.arctic.trace.SnapshotSummary.SNAPSHOT_PRODUCER, - com.netease.arctic.trace.SnapshotSummary.SNAPSHOT_PRODUCER_DEFAULT); - commitMeta.setCommitMetaProducer(CommitMetaProducer.valueOf(commitMetaSource)); - commitMeta.setCommitTime(System.currentTimeMillis()); - boolean update = false; - - - if (transactionSnapshotTableChanges.size() > 0) { - transactionSnapshotTableChanges.forEach((snapshotId, internalTableChange) -> { - if (table.isUnkeyedTable()) { - Snapshot snapshot = table.asUnkeyedTable().snapshot(snapshotId); - TableChange tableChange = internalTableChange.toTableChange(table, snapshot, innerTable); - commitMeta.addToChanges(tableChange); - } - }); - update = true; - } else if (defaultTableChange != null) { - Table traceTable; - if (table.isUnkeyedTable()) { - traceTable = table.asUnkeyedTable(); - } else { - throw new IllegalStateException("can't apply table change on keyed table."); - } - - TableChange tableChange = - defaultTableChange.toTableChange(table, traceTable.currentSnapshot(), innerTable); - commitMeta.addToChanges(tableChange); - update = true; - } - - if (updateColumns.size() > 0 && Constants.INNER_TABLE_BASE.equals(innerTable)) { - int schemaId = table.schema().schemaId(); - SchemaUpdateMeta ddlCommitMeta = new SchemaUpdateMeta(); - ddlCommitMeta.setSchemaId(schemaId); - List commitUpdateColumns = - updateColumns.stream().map(AmsTableTracer::covert).collect(Collectors.toList()); - ddlCommitMeta.setUpdateColumns(commitUpdateColumns); - commitMeta.setSchemaUpdateMeta(ddlCommitMeta); - update = true; - } - if (this.properties != null && Constants.INNER_TABLE_BASE.equals(innerTable)) { - try { - Map catalogProperties = client.getCatalog(table.id().getCatalog()).getCatalogProperties(); - if (catalogProperties != null) { - commitMeta.setProperties(CatalogUtil.mergeCatalogPropertiesToTable(this.properties, catalogProperties)); - } else { - commitMeta.setProperties(this.properties); - } - update = true; - } catch (TException e) { - LOG.warn("get catalog properties error", e); - } - } - if (!update) { - return; - } - - try { - client.tableCommit(commitMeta); - } catch (Throwable t) { - LOG.warn("trace table commit failed", t); - } - } - - @Override - public void replaceProperties(Map newProperties) { - this.properties = newProperties; - } - - @Override - public void updateColumn(UpdateColumn updateColumn) { - updateColumns.add(updateColumn); - } - - @Override - public void setSnapshotSummary(String key, String value) { - snapshotSummary.put(key, value); - } - - public void setAction(String action) { - this.action = action; - } - - public static class InternalTableChange { - private final List addedFiles = Lists.newArrayList(); - private final List deletedFiles = Lists.newArrayList(); - private final List addedDeleteFiles = Lists.newArrayList(); - private final List deletedDeleteFiles = Lists.newArrayList(); - - public InternalTableChange() { - } - - public void addDataFile(DataFile dataFile) { - addedFiles.add(dataFile); - } - - public void deleteDataFile(DataFile dataFile) { - deletedFiles.add(dataFile); - } - - public void addDeleteFile(DeleteFile deleteFile) { - addedDeleteFiles.add(deleteFile); - } - - public void deleteDeleteFile(DeleteFile deleteFile) { - deletedDeleteFiles.add(deleteFile); - } - - /** - * Build {@link TableChange} to report to ams. - * - * @param arcticTable arctic table which table change belongs - * @param snapshot the snapshot produced in this operation - * @param innerTable inner table name - * @return table change - */ - public TableChange toTableChange(ArcticTable arcticTable, Snapshot snapshot, String innerTable) { - - long currentSnapshotId = snapshot.snapshotId(); - long parentSnapshotId = - snapshot.parentId() == null ? -1 : snapshot.parentId(); - Map summary = snapshot.summary(); - long realAddedDataFiles = summary.get(SnapshotSummary.ADDED_FILES_PROP) == null ? - 0 : Long.parseLong(summary.get(SnapshotSummary.ADDED_FILES_PROP)); - long realDeletedDataFiles = summary.get(SnapshotSummary.DELETED_FILES_PROP) == null ? - 0 : Long.parseLong(summary.get(SnapshotSummary.DELETED_FILES_PROP)); - long realAddedDeleteFiles = summary.get(SnapshotSummary.ADDED_DELETE_FILES_PROP) == null ? - 0 : Long.parseLong(summary.get(SnapshotSummary.ADDED_DELETE_FILES_PROP)); - long readRemovedDeleteFiles = summary.get(SnapshotSummary.REMOVED_DELETE_FILES_PROP) == null ? - 0 : Long.parseLong(summary.get(SnapshotSummary.REMOVED_DELETE_FILES_PROP)); - - List addFiles = new ArrayList<>(); - List deleteFiles = new ArrayList<>(); - if (realAddedDataFiles == addedFiles.size() && realDeletedDataFiles == deletedFiles.size() && - realAddedDeleteFiles == addedDeleteFiles.size() && readRemovedDeleteFiles == deletedDeleteFiles.size()) { - addFiles = - addedFiles.stream().map(file -> ConvertStructUtil.convertToAmsDatafile(file, arcticTable, innerTable)) - .collect(Collectors.toList()); - deleteFiles = - deletedFiles.stream().map(file -> ConvertStructUtil.convertToAmsDatafile(file, arcticTable, innerTable)) - .collect(Collectors.toList()); - addFiles.addAll(addedDeleteFiles.stream() - .map(file -> ConvertStructUtil.convertToAmsDatafile(file, arcticTable, innerTable)) - .collect(Collectors.toList())); - deleteFiles.addAll(deletedDeleteFiles.stream() - .map(file -> ConvertStructUtil.convertToAmsDatafile(file, arcticTable, innerTable)) - .collect(Collectors.toList())); - } else { - // tracer file change info is different from iceberg snapshot, should get iceberg real file change info - SnapshotFileUtil.getSnapshotFiles(arcticTable, innerTable, snapshot, addFiles, deleteFiles); - } - - return new TableChange(innerTable, addFiles, deleteFiles, currentSnapshotId, - parentSnapshotId, snapshot.sequenceNumber()); - } - } - - private static com.netease.arctic.ams.api.UpdateColumn covert(UpdateColumn updateColumn) { - com.netease.arctic.ams.api.UpdateColumn commit = new com.netease.arctic.ams.api.UpdateColumn(); - commit.setName(updateColumn.getName()); - commit.setParent(updateColumn.getParent()); - commit.setType(updateColumn.getType() == null ? null : updateColumn.getType().toString()); - commit.setDoc(updateColumn.getDoc()); - commit.setOperate(updateColumn.getOperate().name()); - commit.setIsOptional(updateColumn.getOptional() == null ? null : updateColumn.getOptional().toString()); - commit.setNewName(updateColumn.getNewName()); - return commit; - } -} diff --git a/core/src/main/java/com/netease/arctic/trace/CreateTableTransaction.java b/core/src/main/java/com/netease/arctic/trace/CreateTableTransaction.java deleted file mode 100644 index 01b5ac44cc..0000000000 --- a/core/src/main/java/com/netease/arctic/trace/CreateTableTransaction.java +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netease.arctic.trace; - -import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.KeyedTable; -import com.netease.arctic.table.UnkeyedTable; -import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.DeleteFiles; -import org.apache.iceberg.ExpireSnapshots; -import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.HistoryEntry; -import org.apache.iceberg.ManageSnapshots; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.OverwriteFiles; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.ReplacePartitions; -import org.apache.iceberg.ReplaceSortOrder; -import org.apache.iceberg.RewriteFiles; -import org.apache.iceberg.RewriteManifests; -import org.apache.iceberg.RowDelta; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotRef; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.StatisticsFile; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.Transaction; -import org.apache.iceberg.UpdateLocation; -import org.apache.iceberg.UpdatePartitionSpec; -import org.apache.iceberg.UpdateProperties; -import org.apache.iceberg.UpdateSchema; -import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.LocationProvider; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; -import java.util.function.Supplier; - - -public class CreateTableTransaction implements Transaction { - - private final Transaction fakeTransaction; - - private final TransactionTracker transactionTracer; - private final Table transactionTable; - private final Supplier createTable; - private final Runnable rollback; - - public CreateTableTransaction( - Transaction fakeTransaction, - Supplier createTable, - Runnable rollback) { - this.fakeTransaction = fakeTransaction; - this.transactionTracer = new TransactionTracker(); - this.transactionTable = new TransactionTable(); - this.createTable = createTable; - this.rollback = rollback; - } - - @Override - public Table table() { - return transactionTable; - } - - @Override - public UpdateSchema updateSchema() { - throw new UnsupportedOperationException("create table transaction unsupported updateSchema"); - } - - @Override - public UpdatePartitionSpec updateSpec() { - throw new UnsupportedOperationException("create table transaction unsupported updateSpec"); - } - - @Override - public UpdateProperties updateProperties() { - throw new UnsupportedOperationException("create table transaction unsupported updateProperties"); - } - - @Override - public ReplaceSortOrder replaceSortOrder() { - throw new UnsupportedOperationException("create table transaction unsupported updateProperties"); - } - - @Override - public UpdateLocation updateLocation() { - throw new UnsupportedOperationException("create table transaction unsupported updateProperties"); - } - - @Override - public AppendFiles newAppend() { - return new AppendFiles() { - @Override - public AppendFiles appendFile(DataFile file) { - transactionTracer.addDataFile(file); - return this; - } - - @Override - public AppendFiles appendManifest(ManifestFile file) { - throw new UnsupportedOperationException("create table transaction AppendFiles unsupported ManifestFile"); - } - - @Override - public AppendFiles set(String property, String value) { - throw new UnsupportedOperationException("create table transaction AppendFiles unsupported set"); - } - - @Override - public AppendFiles deleteWith(Consumer deleteFunc) { - throw new UnsupportedOperationException("create table transaction AppendFiles unsupported deleteWith"); - } - - @Override - public AppendFiles stageOnly() { - throw new UnsupportedOperationException("create table transaction AppendFiles unsupported stageOnly"); - } - - @Override - public AppendFiles scanManifestsWith(ExecutorService executorService) { - throw new UnsupportedOperationException("create table transaction AppendFiles unsupported scanManifestsWith"); - } - - @Override - public Snapshot apply() { - throw new UnsupportedOperationException("create table transaction AppendFiles unsupported apply"); - } - - @Override - public void commit() { - transactionTracer.commit(); - } - }; - } - - @Override - public AppendFiles newFastAppend() { - throw new UnsupportedOperationException("create table transaction unsupported updateProperties"); - } - - @Override - public RewriteFiles newRewrite() { - throw new UnsupportedOperationException("create table transaction unsupported newRewrite"); - } - - @Override - public RewriteManifests rewriteManifests() { - throw new UnsupportedOperationException("create table transaction unsupported rewriteManifests"); - } - - @Override - public OverwriteFiles newOverwrite() { - throw new UnsupportedOperationException("create table transaction unsupported newOverwrite"); - } - - @Override - public RowDelta newRowDelta() { - throw new UnsupportedOperationException("create table transaction unsupported newRowDelta"); - } - - @Override - public ReplacePartitions newReplacePartitions() { - throw new UnsupportedOperationException("create table transaction unsupported newReplacePartitions"); - } - - @Override - public DeleteFiles newDelete() { - throw new UnsupportedOperationException("create table transaction unsupported newReplacePartitions"); - } - - @Override - public ExpireSnapshots expireSnapshots() { - throw new UnsupportedOperationException("create table transaction unsupported newReplacePartitions"); - } - - @Override - public void commitTransaction() { - try { - ArcticTable arcticTable = createTable.get(); - if (!transactionTracer.add.isEmpty()) { - if (!transactionTracer.isCommit) { - throw new IllegalStateException("last operation has not committed"); - } - Transaction transaction; - if (arcticTable.isUnkeyedTable()) { - UnkeyedTable table = arcticTable.asUnkeyedTable(); - transaction = table.newTransaction(); - } else { - KeyedTable keyedTable = arcticTable.asKeyedTable(); - transaction = keyedTable.baseTable().newTransaction(); - } - AppendFiles appendFiles = transaction.newAppend(); - for (DataFile dataFile : transactionTracer.add) { - appendFiles.appendFile(dataFile); - } - appendFiles.commit(); - transaction.commitTransaction(); - } - } catch (Throwable t) { - rollback.run(); - throw t; - } - } - - static class TransactionTracker implements TableTracer { - - private final List add = new ArrayList<>(); - - private boolean isCommit; - - @Override - public void addDataFile(DataFile dataFile) { - add.add(dataFile); - } - - @Override - public void deleteDataFile(DataFile dataFile) { - } - - @Override - public void addDeleteFile(DeleteFile deleteFile) { - } - - @Override - public void deleteDeleteFile(DeleteFile deleteFile) { - } - - @Override - public void commit() { - isCommit = true; - } - - @Override - public void replaceProperties(Map newProperties) { - } - - @Override - public void setSnapshotSummary(String key, String value) { - } - - @Override - public void updateColumn(UpdateColumn updateColumn) { - - } - } - - class TransactionTable implements Table, HasTableOperations, Serializable { - - Table transactionTable; - - public TransactionTable() { - transactionTable = fakeTransaction.table(); - } - - @Override - public TableOperations operations() { - if (transactionTable instanceof HasTableOperations) { - return ((HasTableOperations) transactionTable).operations(); - } - throw new IllegalStateException("table does not support operations"); - } - - @Override - public String name() { - return transactionTable.name(); - } - - @Override - public void refresh() { - transactionTable.refresh(); - } - - @Override - public TableScan newScan() { - return transactionTable.newScan(); - } - - @Override - public Schema schema() { - return transactionTable.schema(); - } - - @Override - public Map schemas() { - return transactionTable.schemas(); - } - - @Override - public PartitionSpec spec() { - return transactionTable.spec(); - } - - @Override - public Map specs() { - return transactionTable.specs(); - } - - @Override - public SortOrder sortOrder() { - return transactionTable.sortOrder(); - } - - @Override - public Map sortOrders() { - return transactionTable.sortOrders(); - } - - @Override - public Map properties() { - return transactionTable.properties(); - } - - @Override - public String location() { - return transactionTable.location(); - } - - @Override - public Snapshot currentSnapshot() { - return transactionTable.currentSnapshot(); - } - - @Override - public Snapshot snapshot(long snapshotId) { - return transactionTable.snapshot(snapshotId); - } - - @Override - public Iterable snapshots() { - return transactionTable.snapshots(); - } - - @Override - public List history() { - return transactionTable.history(); - } - - @Override - public UpdateSchema updateSchema() { - return transactionTable.updateSchema(); - } - - @Override - public UpdatePartitionSpec updateSpec() { - return transactionTable.updateSpec(); - } - - @Override - public UpdateProperties updateProperties() { - return transactionTable.updateProperties(); - } - - @Override - public ReplaceSortOrder replaceSortOrder() { - return transactionTable.replaceSortOrder(); - } - - @Override - public UpdateLocation updateLocation() { - return transactionTable.updateLocation(); - } - - @Override - public AppendFiles newAppend() { - return transactionTable.newAppend(); - } - - @Override - public AppendFiles newFastAppend() { - return transactionTable.newFastAppend(); - } - - @Override - public RewriteFiles newRewrite() { - return transactionTable.newRewrite(); - } - - @Override - public RewriteManifests rewriteManifests() { - return transactionTable.rewriteManifests(); - } - - @Override - public OverwriteFiles newOverwrite() { - return transactionTable.newOverwrite(); - } - - @Override - public RowDelta newRowDelta() { - return transactionTable.newRowDelta(); - } - - @Override - public ReplacePartitions newReplacePartitions() { - return transactionTable.newReplacePartitions(); - } - - @Override - public DeleteFiles newDelete() { - return transactionTable.newDelete(); - } - - @Override - public ExpireSnapshots expireSnapshots() { - return transactionTable.expireSnapshots(); - } - - - @Override - public ManageSnapshots manageSnapshots() { - return transactionTable.manageSnapshots(); - } - - @Override - public Transaction newTransaction() { - return transactionTable.newTransaction(); - } - - @Override - public FileIO io() { - return transactionTable.io(); - } - - @Override - public EncryptionManager encryption() { - return transactionTable.encryption(); - } - - @Override - public LocationProvider locationProvider() { - return transactionTable.locationProvider(); - } - - @Override - public List statisticsFiles() { - return transactionTable.statisticsFiles(); - } - - @Override - public Map refs() { - return transactionTable.refs(); - } - - @Override - public String toString() { - return transactionTable.toString(); - } - } -} - - - - diff --git a/core/src/main/java/com/netease/arctic/trace/TableTracer.java b/core/src/main/java/com/netease/arctic/trace/TableTracer.java deleted file mode 100644 index 823f016457..0000000000 --- a/core/src/main/java/com/netease/arctic/trace/TableTracer.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netease.arctic.trace; - -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.types.Type; - -import java.util.Map; - -/** - * Tracing table changes. - * @deprecated since 0.5.0, will be removed in 0.6.0; - */ -@Deprecated -public interface TableTracer { - - /** - * Add a {@link DataFile} into table - * @param dataFile file to add - */ - void addDataFile(DataFile dataFile); - - /** - * Delete a {@link DataFile} from table - * @param dataFile file to delete - */ - void deleteDataFile(DataFile dataFile); - - /** - * Add a {@link DeleteFile} into table - * @param deleteFile file to add - */ - void addDeleteFile(DeleteFile deleteFile); - - /** - * Add a {@link DataFile} into table - * @param deleteFile file to delete - */ - void deleteDeleteFile(DeleteFile deleteFile); - - /** - * Replace some properties of table - * @param newProperties properties to replace - */ - void replaceProperties(Map newProperties); - - /** - * Set a summary property in the snapshot produced by this update. - * - * @param key a String property name - * @param value a String property value - */ - void setSnapshotSummary(String key, String value); - - /** - * update column of table - * @param updateColumn updated column info - */ - void updateColumn(UpdateColumn updateColumn); - - /** - * Commit table changes. - */ - void commit(); - - class UpdateColumn { - private final String parent; - private final String name; - private final Type type; - private final String doc; - private final AmsTableTracer.SchemaOperateType operate; - private final Boolean isOptional; - private final String newName; - - public UpdateColumn( - String name, - String parent, - Type type, - String doc, - SchemaOperateType operate, - Boolean isOptional, - String newName) { - this.parent = parent; - this.name = name; - this.type = type; - this.doc = doc; - this.operate = operate; - this.isOptional = isOptional; - this.newName = newName; - } - - public String getParent() { - return parent; - } - - public String getName() { - return name; - } - - public Type getType() { - return type; - } - - public String getDoc() { - return doc; - } - - public SchemaOperateType getOperate() { - return operate; - } - - public Boolean getOptional() { - return isOptional; - } - - public String getNewName() { - return newName; - } - } - - enum SchemaOperateType { - ADD, - DROP, - ALERT, - RENAME, - MOVE_BEFORE, - MOVE_AFTER, - MOVE_FIRST - } -} diff --git a/core/src/main/java/com/netease/arctic/trace/TraceOperations.java b/core/src/main/java/com/netease/arctic/trace/TraceOperations.java deleted file mode 100644 index 8446913a7f..0000000000 --- a/core/src/main/java/com/netease/arctic/trace/TraceOperations.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netease.arctic.trace; - -import org.apache.iceberg.DataOperations; - -/** - * Table change operation names. - */ -public class TraceOperations { - public static final String APPEND = DataOperations.APPEND; - public static final String REPLACE = DataOperations.REPLACE; - public static final String OVERWRITE = DataOperations.OVERWRITE; - public static final String DELETE = DataOperations.DELETE; - public static final String UPDATE_PROPERTIES = "update_properties"; - public static final String UPDATE_SCHEMA = "update_schema"; -} diff --git a/core/src/main/java/com/netease/arctic/trace/TracedSchemaUpdate.java b/core/src/main/java/com/netease/arctic/trace/TracedSchemaUpdate.java deleted file mode 100644 index 44afe6d6bd..0000000000 --- a/core/src/main/java/com/netease/arctic/trace/TracedSchemaUpdate.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netease.arctic.trace; - -import com.netease.arctic.table.KeyedTable; -import org.apache.iceberg.Schema; -import org.apache.iceberg.UpdateSchema; -import org.apache.iceberg.types.Type; - -import java.util.Collection; - -/** - * Schema evolution API implementation for {@link KeyedTable}. - */ -public class TracedSchemaUpdate implements UpdateSchema { - private final UpdateSchema updateSchema; - private final TableTracer tracer; - - public TracedSchemaUpdate(UpdateSchema updateSchema, TableTracer tracer) { - this.tracer = tracer; - this.updateSchema = updateSchema; - } - - @Override - public TracedSchemaUpdate allowIncompatibleChanges() { - updateSchema.allowIncompatibleChanges(); - return this; - } - - @Override - public UpdateSchema addColumn(String name, Type type, String doc) { - updateSchema.addColumn(name, type, doc); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, type, doc, - TableTracer.SchemaOperateType.ADD, null, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema addColumn(String parent, String name, Type type, String doc) { - updateSchema.addColumn(parent, name, type, doc); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, parent, type, doc, - TableTracer.SchemaOperateType.ADD, false, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema addRequiredColumn(String name, Type type, String doc) { - updateSchema.addRequiredColumn(name, type, doc); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, type, doc, - TableTracer.SchemaOperateType.ADD, false, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema addRequiredColumn(String parent, String name, Type type, String doc) { - updateSchema.addRequiredColumn(parent, name, type, doc); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, parent, type, doc, - TableTracer.SchemaOperateType.ADD, false, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema deleteColumn(String name) { - updateSchema.deleteColumn(name); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, null, null, - TableTracer.SchemaOperateType.DROP, null, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema renameColumn(String name, String newName) { - updateSchema.renameColumn(name, newName); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, null, null, - TableTracer.SchemaOperateType.RENAME, null, newName); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema requireColumn(String name) { - updateSchema.requireColumn(name); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, null, null, - TableTracer.SchemaOperateType.ALERT, false, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema makeColumnOptional(String name) { - updateSchema.makeColumnOptional(name); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, null, null, - TableTracer.SchemaOperateType.ALERT, true, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema updateColumn(String name, Type.PrimitiveType newType) { - updateSchema.updateColumn(name, newType); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, newType, null, - TableTracer.SchemaOperateType.ALERT, null, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema updateColumnDoc(String name, String doc) { - updateSchema.updateColumnDoc(name, doc); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, null, doc, - TableTracer.SchemaOperateType.ALERT, null, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema moveFirst(String name) { - updateSchema.moveFirst(name); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, null, null, - TableTracer.SchemaOperateType.MOVE_FIRST, null, null); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema moveBefore(String name, String beforeName) { - updateSchema.moveBefore(name, beforeName); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, null, null, - TableTracer.SchemaOperateType.MOVE_BEFORE, null, beforeName); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema moveAfter(String name, String afterName) { - updateSchema.moveAfter(name, afterName); - TableTracer.UpdateColumn updateColumn = new TableTracer.UpdateColumn(name, null, null, null, - TableTracer.SchemaOperateType.MOVE_AFTER, null, afterName); - tracer.updateColumn(updateColumn); - return this; - } - - @Override - public UpdateSchema unionByNameWith(Schema newSchema) { - updateSchema.unionByNameWith(newSchema); - return this; - } - - @Override - public UpdateSchema setIdentifierFields(Collection names) { - throw new UnsupportedOperationException("unsupported setIdentifierFields arctic table."); - } - - @Override - public Schema apply() { - return updateSchema.apply(); - } - - @Override - public void commit() { - updateSchema.commit(); - tracer.commit(); - } -} diff --git a/core/src/main/java/com/netease/arctic/trace/TracedUpdateProperties.java b/core/src/main/java/com/netease/arctic/trace/TracedUpdateProperties.java deleted file mode 100644 index 26be07cfd0..0000000000 --- a/core/src/main/java/com/netease/arctic/trace/TracedUpdateProperties.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netease.arctic.trace; - -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.UpdateProperties; - -import java.util.HashMap; -import java.util.Map; - -/** - * Wrap {@link UpdateProperties} with {@link TableTracer}. - */ -public class TracedUpdateProperties implements UpdateProperties { - - private final UpdateProperties updateProperties; - private final TableTracer tracer; - private Map properties; - - public TracedUpdateProperties(UpdateProperties updateProperties, TableTracer tracer) { - this.updateProperties = updateProperties; - this.tracer = tracer; - } - - @Override - public Map apply() { - Map props = updateProperties.apply(); - this.properties = new HashMap<>(props); - tracer.replaceProperties(this.properties); - return props; - } - - @Override - public void commit() { - if (this.properties == null) { - this.apply(); - } - this.updateProperties.commit(); - this.tracer.commit(); - } - - @Override - public UpdateProperties set(String key, String value) { - updateProperties.set(key, value); - this.properties = null; - return this; - } - - @Override - public UpdateProperties remove(String key) { - updateProperties.remove(key); - this.properties = null; - return this; - } - - @Override - public UpdateProperties defaultFormat(FileFormat format) { - updateProperties.defaultFormat(format); - this.properties = null; - return this; - } -} diff --git a/core/src/test/java/com/netease/arctic/trace/TestTableTracer.java b/core/src/test/java/com/netease/arctic/trace/TestTableTracer.java index 303aac4100..a593416b44 100644 --- a/core/src/test/java/com/netease/arctic/trace/TestTableTracer.java +++ b/core/src/test/java/com/netease/arctic/trace/TestTableTracer.java @@ -31,6 +31,7 @@ import com.netease.arctic.data.DataFileType; import com.netease.arctic.io.writer.GenericTaskWriters; import com.netease.arctic.io.writer.SortedPosDeleteWriter; +import com.netease.arctic.op.SnapshotSummary; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.UnkeyedTable; import org.apache.iceberg.DataOperations; diff --git a/hive/src/main/java/com/netease/arctic/hive/catalog/MixedHiveTables.java b/hive/src/main/java/com/netease/arctic/hive/catalog/MixedHiveTables.java index 8e71f3a533..46ae67dc9f 100644 --- a/hive/src/main/java/com/netease/arctic/hive/catalog/MixedHiveTables.java +++ b/hive/src/main/java/com/netease/arctic/hive/catalog/MixedHiveTables.java @@ -1,6 +1,5 @@ package com.netease.arctic.hive.catalog; -import com.netease.arctic.AmsClient; import com.netease.arctic.ams.api.CatalogMeta; import com.netease.arctic.ams.api.TableMeta; import com.netease.arctic.ams.api.properties.MetaTableProperties; @@ -40,11 +39,7 @@ public class MixedHiveTables extends MixedTables { private volatile CachedHiveClientPool hiveClientPool; public MixedHiveTables(CatalogMeta catalogMeta) { - this(catalogMeta, null); - } - - public MixedHiveTables(CatalogMeta catalogMeta, AmsClient amsClient) { - super(catalogMeta, amsClient); + super(catalogMeta); this.hiveClientPool = new CachedHiveClientPool(getTableMetaStore(), catalogMeta.getCatalogProperties()); } @@ -66,15 +61,15 @@ protected KeyedHiveTable loadKeyedTable(TableMeta tableMeta) { Table baseIcebergTable = tableMetaStore.doAs(() -> tables.load(baseLocation)); UnkeyedHiveTable baseTable = new KeyedHiveTable.HiveBaseInternalTable(tableIdentifier, CatalogUtil.useArcticTableOperations(baseIcebergTable, baseLocation, fileIO, tableMetaStore.getConfiguration()), - fileIO, tableLocation, amsClient, hiveClientPool, catalogMeta.getCatalogProperties(), false); + fileIO, tableLocation, hiveClientPool, catalogMeta.getCatalogProperties(), false); Table changeIcebergTable = tableMetaStore.doAs(() -> tables.load(changeLocation)); ChangeTable changeTable = new KeyedHiveTable.HiveChangeInternalTable(tableIdentifier, CatalogUtil.useArcticTableOperations(changeIcebergTable, changeLocation, fileIO, tableMetaStore.getConfiguration()), - fileIO, amsClient, catalogMeta.getCatalogProperties()); + fileIO, catalogMeta.getCatalogProperties()); return new KeyedHiveTable(tableMeta, tableLocation, - buildPrimaryKeySpec(baseTable.schema(), tableMeta), amsClient, hiveClientPool, baseTable, changeTable); + buildPrimaryKeySpec(baseTable.schema(), tableMeta), hiveClientPool, baseTable, changeTable); } /** @@ -97,7 +92,7 @@ protected UnkeyedHiveTable loadUnKeyedTable(TableMeta tableMeta) { checkPrivilege(fileIO, baseLocation); Table table = tableMetaStore.doAs(() -> tables.load(baseLocation)); return new UnkeyedHiveTable(tableIdentifier, CatalogUtil.useArcticTableOperations(table, baseLocation, - fileIO, tableMetaStore.getConfiguration()), fileIO, tableLocation, amsClient, hiveClientPool, + fileIO, tableMetaStore.getConfiguration()), fileIO, tableLocation, hiveClientPool, catalogMeta.getCatalogProperties()); } @@ -132,7 +127,7 @@ protected KeyedTable createKeyedTable(TableMeta tableMeta, Schema schema, Primar UnkeyedHiveTable baseTable = new KeyedHiveTable.HiveBaseInternalTable(tableIdentifier, CatalogUtil.useArcticTableOperations(baseIcebergTable, baseLocation, fileIO, tableMetaStore.getConfiguration()), - fileIO, tableLocation, amsClient, hiveClientPool, catalogMeta.getCatalogProperties(), false); + fileIO, tableLocation, hiveClientPool, catalogMeta.getCatalogProperties(), false); Table changeIcebergTable = tableMetaStore.doAs(() -> { try { @@ -147,7 +142,7 @@ protected KeyedTable createKeyedTable(TableMeta tableMeta, Schema schema, Primar ChangeTable changeTable = new KeyedHiveTable.HiveChangeInternalTable(tableIdentifier, CatalogUtil.useArcticTableOperations(changeIcebergTable, changeLocation, fileIO, tableMetaStore.getConfiguration()), - fileIO, amsClient, catalogMeta.getCatalogProperties()); + fileIO, catalogMeta.getCatalogProperties()); Map metaProperties = tableMeta.getProperties(); try { @@ -173,7 +168,7 @@ protected KeyedTable createKeyedTable(TableMeta tableMeta, Schema schema, Primar throw new RuntimeException("Failed to create hive table:" + tableMeta.getTableIdentifier(), e); } return new KeyedHiveTable(tableMeta, tableLocation, - primaryKeySpec, amsClient, hiveClientPool, baseTable, changeTable); + primaryKeySpec, hiveClientPool, baseTable, changeTable); } @Override @@ -225,7 +220,7 @@ protected UnkeyedHiveTable createUnKeyedTable(TableMeta tableMeta, Schema schema tableIdentifier, tableLocation, tableMeta.getProperties(), tableMetaStore, catalogMeta.getCatalogProperties()); return new UnkeyedHiveTable(tableIdentifier, CatalogUtil.useArcticTableOperations(table, baseLocation, fileIO, - tableMetaStore.getConfiguration()), fileIO, tableLocation, amsClient, hiveClientPool, + tableMetaStore.getConfiguration()), fileIO, tableLocation, hiveClientPool, catalogMeta.getCatalogProperties()); } diff --git a/hive/src/main/java/com/netease/arctic/hive/table/KeyedHiveTable.java b/hive/src/main/java/com/netease/arctic/hive/table/KeyedHiveTable.java index 332642f470..bc09a7f4e9 100644 --- a/hive/src/main/java/com/netease/arctic/hive/table/KeyedHiveTable.java +++ b/hive/src/main/java/com/netease/arctic/hive/table/KeyedHiveTable.java @@ -18,7 +18,6 @@ package com.netease.arctic.hive.table; -import com.netease.arctic.AmsClient; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.ams.api.TableMeta; import com.netease.arctic.hive.HMSClientPool; @@ -52,7 +51,6 @@ public KeyedHiveTable( TableMeta tableMeta, String tableLocation, PrimaryKeySpec primaryKeySpec, - AmsClient client, HMSClientPool hiveClient, UnkeyedHiveTable baseTable, ChangeTable changeTable) { @@ -127,8 +125,8 @@ public static class HiveChangeInternalTable extends BasicUnkeyedTable implements public HiveChangeInternalTable( TableIdentifier tableIdentifier, Table changeIcebergTable, ArcticFileIO arcticFileIO, - AmsClient client, Map catalogProperties) { - super(tableIdentifier, changeIcebergTable, arcticFileIO, client, catalogProperties); + Map catalogProperties) { + super(tableIdentifier, changeIcebergTable, arcticFileIO, catalogProperties); } @Override @@ -151,10 +149,10 @@ public static class HiveBaseInternalTable extends UnkeyedHiveTable implements Ba public HiveBaseInternalTable( TableIdentifier tableIdentifier, Table icebergTable, - ArcticHadoopFileIO arcticFileIO, String tableLocation, AmsClient client, + ArcticHadoopFileIO arcticFileIO, String tableLocation, HMSClientPool hiveClient, Map catalogProperties, boolean syncHiveChange) { - super(tableIdentifier, icebergTable, arcticFileIO, tableLocation, client, hiveClient, catalogProperties, + super(tableIdentifier, icebergTable, arcticFileIO, tableLocation, hiveClient, catalogProperties, syncHiveChange); } diff --git a/hive/src/main/java/com/netease/arctic/hive/table/UnkeyedHiveTable.java b/hive/src/main/java/com/netease/arctic/hive/table/UnkeyedHiveTable.java index a1a6291138..16e4b29d5b 100644 --- a/hive/src/main/java/com/netease/arctic/hive/table/UnkeyedHiveTable.java +++ b/hive/src/main/java/com/netease/arctic/hive/table/UnkeyedHiveTable.java @@ -18,7 +18,6 @@ package com.netease.arctic.hive.table; -import com.netease.arctic.AmsClient; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.hive.HMSClientPool; import com.netease.arctic.hive.HiveTableProperties; @@ -58,7 +57,6 @@ public UnkeyedHiveTable( Table icebergTable, ArcticHadoopFileIO arcticFileIO, String tableLocation, - AmsClient client, HMSClientPool hiveClient, Map catalogProperties) { this( @@ -66,7 +64,6 @@ public UnkeyedHiveTable( icebergTable, arcticFileIO, tableLocation, - client, hiveClient, catalogProperties, true); @@ -77,11 +74,10 @@ public UnkeyedHiveTable( Table icebergTable, ArcticHadoopFileIO arcticFileIO, String tableLocation, - AmsClient client, HMSClientPool hiveClient, Map catalogProperties, boolean syncHiveChange) { - super(tableIdentifier, icebergTable, arcticFileIO, client, catalogProperties); + super(tableIdentifier, icebergTable, arcticFileIO, catalogProperties); this.fileIO = arcticFileIO; this.hiveClient = hiveClient; this.tableLocation = tableLocation; diff --git a/trino/src/main/java/com/netease/arctic/trino/unkeyed/ArcticTrinoCatalog.java b/trino/src/main/java/com/netease/arctic/trino/unkeyed/ArcticTrinoCatalog.java index e39f582a9b..4c63a960cc 100644 --- a/trino/src/main/java/com/netease/arctic/trino/unkeyed/ArcticTrinoCatalog.java +++ b/trino/src/main/java/com/netease/arctic/trino/unkeyed/ArcticTrinoCatalog.java @@ -135,7 +135,7 @@ public Transaction newCreateTableTransaction( String location, Map properties) { return arcticCatalog.newTableBuilder(getTableIdentifier(schemaTableName), schema) .withPartitionSpec(partitionSpec) - .withProperties(properties).newCreateTableTransaction(); + .withProperties(properties).createTransaction(); } @Override diff --git a/trino/src/test/java/com/netease/arctic/trino/iceberg/TestArcticTable.java b/trino/src/test/java/com/netease/arctic/trino/iceberg/TestArcticTable.java index d39b9e6d30..c6cd1d533d 100644 --- a/trino/src/test/java/com/netease/arctic/trino/iceberg/TestArcticTable.java +++ b/trino/src/test/java/com/netease/arctic/trino/iceberg/TestArcticTable.java @@ -59,7 +59,7 @@ public class TestArcticTable extends BasicUnkeyedTable { private TableIdentifier tableIdentifier; public TestArcticTable(BaseTable table, TableIdentifier tableIdentifier) { - super(null, null, null, null, null); + super(null, null, null, null); this.table = table; this.tableIdentifier = tableIdentifier; } diff --git a/trino/src/test/java/com/netease/arctic/trino/iceberg/TestBasicArcticCatalog.java b/trino/src/test/java/com/netease/arctic/trino/iceberg/TestBasicArcticCatalog.java index a638fb011f..9961aae02b 100644 --- a/trino/src/test/java/com/netease/arctic/trino/iceberg/TestBasicArcticCatalog.java +++ b/trino/src/test/java/com/netease/arctic/trino/iceberg/TestBasicArcticCatalog.java @@ -148,7 +148,8 @@ public ArcticTable create() { return null; } - public Transaction newCreateTableTransaction() { + @Override + public Transaction createTransaction() { return catalog.newCreateTableTransaction(org.apache.iceberg.catalog.TableIdentifier.of(identifier.getCatalog(), identifier.getDatabase(), identifier.getTableName()), schema, partitionSpec, location, properties); }