From 4e45907248bdabbc4a0325d05e03de3abd2266d0 Mon Sep 17 00:00:00 2001 From: wangzeyu <1249369293@qq.com> Date: Wed, 31 May 2023 09:26:07 +0800 Subject: [PATCH] [AMS-Refactor] fix optimizing integration test (#1487) * fix optimizing integration test * fix optimizing integration test * fix optimizing integration test * fix optimizing integration test and fix runtime bug * fix optimizing integration test --- .../netease/arctic/ams/api/PropertyNames.java | 1 - .../optimizer/AbstractResourceContainer.java | 3 - .../optimizer/FlinkOptimizerContainer.java | 4 +- .../arctic/optimizer/OptimizerExecutor.java | 2 +- ams/server/pom.xml | 7 + .../arctic/server/ArcticServiceContainer.java | 28 +--- .../server/DefaultOptimizingService.java | 9 +- .../server/dashboard/model/SqlExample.java | 2 +- .../server/optimizing/OptimizingQueue.java | 2 +- .../server/table/DefaultTableService.java | 1 - .../netease/arctic/server/AmsEnvironment.java | 67 ++++++--- .../optimizing/AbstractOptimizingTest.java | 91 +++++++++--- .../optimizing/BaseOptimizingChecker.java | 31 ++-- .../TestIcebergHadoopOptimizing.java | 51 +++++-- .../optimizing/TestMixedHiveOptimizing.java | 24 +-- .../TestMixedIcebergOptimizing.java | 105 ++++++------- .../optimizing/TestOptimizingIntegration.java | 17 +-- .../arctic/IcebergTableTestHelper.java | 138 ------------------ 18 files changed, 246 insertions(+), 337 deletions(-) delete mode 100644 core/src/test/java/com/netease/arctic/IcebergTableTestHelper.java diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/PropertyNames.java b/ams/api/src/main/java/com/netease/arctic/ams/api/PropertyNames.java index 94145e6eea..ceb9b13317 100644 --- a/ams/api/src/main/java/com/netease/arctic/ams/api/PropertyNames.java +++ b/ams/api/src/main/java/com/netease/arctic/ams/api/PropertyNames.java @@ -4,7 +4,6 @@ public class PropertyNames { public static final String RESOURCE_ID = "resource-id"; public static final String EXPORT_PROPERTY_PREFIX = "export."; - public static final String JAVA_PARAMETERS_PROPERTY = "jvm-args"; public static final String OPTIMIZER_AMS_URL = "ams-url"; public static final String AMS_HOME = "ams-home"; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractResourceContainer.java b/ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractResourceContainer.java index bb3cad8974..f1163d6ed2 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractResourceContainer.java +++ b/ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractResourceContainer.java @@ -71,9 +71,6 @@ protected String buildOptimizerStartupArgsString(Resource resource) { .append(resource.getProperties().get(PropertyNames.OPTIMIZER_MEMORY_STORAGE_SIZE)); } } - if (containerProperties.containsKey(PropertyNames.JAVA_PARAMETERS_PROPERTY)) { - stringBuilder.append(containerProperties.get(PropertyNames.JAVA_PARAMETERS_PROPERTY)); - } if (resource.getResourceId() != null && resource.getResourceId().length() > 0) { stringBuilder.append(" -id ").append(resource.getResourceId()); } diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/FlinkOptimizerContainer.java b/ams/optimizer/src/main/java/com/netease/arctic/optimizer/FlinkOptimizerContainer.java index f20dee6d9c..6364f52847 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/FlinkOptimizerContainer.java +++ b/ams/optimizer/src/main/java/com/netease/arctic/optimizer/FlinkOptimizerContainer.java @@ -72,9 +72,9 @@ protected Map doScaleOut(String startUpArgs) { @Override protected String buildOptimizerStartupArgsString(Resource resource) { - String taskManagerMemory = PropertyUtil.checkAndGetProperty(getContainerProperties(), + String taskManagerMemory = PropertyUtil.checkAndGetProperty(resource.getProperties(), TASK_MANAGER_MEMORY_PROPERTY); - String jobManagerMemory = PropertyUtil.checkAndGetProperty(getContainerProperties(), JOB_MANAGER_MEMORY_PROPERTY); + String jobManagerMemory = PropertyUtil.checkAndGetProperty(resource.getProperties(), JOB_MANAGER_MEMORY_PROPERTY); String jobPath = getAMSHome() + "/plugin/optimize/OptimizeJob.jar"; return String.format("%s/bin/flink run -m yarn-cluster -ytm %s -yjm %s -c %s %s -m %s %s", getFlinkHome(), taskManagerMemory, jobManagerMemory, diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerExecutor.java b/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerExecutor.java index ca4e8f3abe..4bc63145bf 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerExecutor.java +++ b/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerExecutor.java @@ -100,7 +100,7 @@ private OptimizingTaskResult executeTask(OptimizingTask task) { ByteBuffer outputByteBuffer = SerializationUtil.simpleSerialize(output); OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), threadId); result.setTaskOutput(outputByteBuffer); - result.setSummary(result.summary); + result.setSummary(output.summary()); LOG.info("Optimizer executor[{}] executed task[{}]", threadId, task.getTaskId()); return result; } catch (Throwable t) { diff --git a/ams/server/pom.xml b/ams/server/pom.xml index 2ef2ecee45..ddf1cba08e 100644 --- a/ams/server/pom.xml +++ b/ams/server/pom.xml @@ -380,6 +380,13 @@ + + org.apache.iceberg + iceberg-data + ${iceberg.version} + tests + test + diff --git a/ams/server/src/main/java/com/netease/arctic/server/ArcticServiceContainer.java b/ams/server/src/main/java/com/netease/arctic/server/ArcticServiceContainer.java index bfa2ede539..c26fad18fb 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/ArcticServiceContainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/ArcticServiceContainer.java @@ -20,7 +20,6 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.google.common.annotations.VisibleForTesting; import com.netease.arctic.ams.api.ArcticTableMetastore; import com.netease.arctic.ams.api.Constants; import com.netease.arctic.ams.api.Environments; @@ -34,9 +33,7 @@ import com.netease.arctic.server.resource.ContainerMetadata; import com.netease.arctic.server.resource.ResourceContainers; import com.netease.arctic.server.table.DefaultTableService; -import com.netease.arctic.server.table.TableService; import com.netease.arctic.server.table.executor.AsyncTableExecutors; -import com.netease.arctic.server.utils.ConfigOption; import com.netease.arctic.server.utils.Configurations; import com.netease.arctic.server.utils.ThriftServiceProxy; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -146,25 +143,6 @@ public void dispose() { optimizingService = null; } - public boolean isStarted() { - return server != null && server.isServing(); - } - - @VisibleForTesting - void setConfig(ConfigOption option, Object value) { - serviceConfig.set(option, value); - } - - @VisibleForTesting - TableService getTableService() { - return this.tableService; - } - - @VisibleForTesting - DefaultOptimizingService getOptimizingService() { - return this.optimizingService; - } - private void initConfig() throws IOException { LOG.info("initializing configurations..."); new ConfigurationHelper().init(); @@ -254,8 +232,10 @@ private void initResourceGroupConfig() { "can not find such container config named" + groupBuilder.getContainer()); } - if (groupConfig.containsKey(ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES)) { - groupBuilder.addProperties(groupConfig.getObject(ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES, + if (groupConfig.containsKey(ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES) && + groupConfig.get(ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES) != null) { + groupBuilder.addProperties(groupConfig.getObject( + ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES, Map.class)); } resourceGroups.add(groupBuilder.build()); diff --git a/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java b/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java index 71c2106bb7..6b679f6a6f 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java +++ b/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java @@ -68,7 +68,7 @@ public class DefaultOptimizingService extends DefaultResourceManager private final Map optimizingQueueByGroup = new ConcurrentHashMap<>(); private final Map optimizingQueueByToken = new ConcurrentHashMap<>(); private final TableManager tableManager; - private RuntimeHandlerChain tableHandlerChain; + private final RuntimeHandlerChain tableHandlerChain; private Timer optimizerMonitorTimer; public DefaultOptimizingService(DefaultTableService tableService, List resourceGroups) { @@ -108,9 +108,12 @@ public void touch(String authToken) { @Override public OptimizingTask pollTask(String authToken, int threadId) { - LOG.info("Optimizer {} polling task", authToken); OptimizingQueue queue = getQueueByToken(authToken); - return queue.pollTask(authToken, threadId); + OptimizingTask task = queue.pollTask(authToken, threadId); + if (task != null) { + LOG.info("Optimizer {} polling task", authToken); + } + return task; } @Override diff --git a/ams/server/src/main/java/com/netease/arctic/server/dashboard/model/SqlExample.java b/ams/server/src/main/java/com/netease/arctic/server/dashboard/model/SqlExample.java index 619581b5ad..c8701d3199 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/dashboard/model/SqlExample.java +++ b/ams/server/src/main/java/com/netease/arctic/server/dashboard/model/SqlExample.java @@ -30,7 +30,7 @@ public enum SqlExample { " primary key (id)\n" + ") using arctic \n" + "partitioned by (days(ts)) \n" + - "tblproperties (\"table.props\" = \"val\");"), + "tblproperties ('table.props' = 'val');"), DELETE_TABLE("DeleteTable", "drop table db_name.table_name;"), EDIT_TABLE("EditTable", diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java index ae3b2761aa..783d8b1066 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java @@ -374,9 +374,9 @@ public void acceptResult(TaskRuntime taskRuntime) { throw new OptimizingClosedException(processId); } if (taskRuntime.getStatus() == TaskRuntime.Status.SUCCESS && allTasksPrepared()) { - tableRuntime.beginCommitting(); this.metricsSummary.addNewFileCnt(taskRuntime.getSummary().getNewFileCnt()); this.metricsSummary.addNewFileSize(taskRuntime.getSummary().getNewFileSize()); + tableRuntime.beginCommitting(); } else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) { if (taskRuntime.getRetry() <= tableRuntime.getMaxExecuteRetryCount()) { retryTask(taskRuntime, true); diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/DefaultTableService.java b/ams/server/src/main/java/com/netease/arctic/server/table/DefaultTableService.java index 201f9d201a..a37f21ef12 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/DefaultTableService.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/DefaultTableService.java @@ -46,7 +46,6 @@ public class DefaultTableService extends PersistentBase implements TableService private final Map internalCatalogMap = new ConcurrentHashMap<>(); private final Map externalCatalogMap = new ConcurrentHashMap<>(); private final Map tableRuntimeMap = new ConcurrentHashMap<>(); - private volatile boolean started = false; private RuntimeHandlerChain headHandler; private Timer tableExplorerTimer; diff --git a/ams/server/src/test/java/com/netease/arctic/server/AmsEnvironment.java b/ams/server/src/test/java/com/netease/arctic/server/AmsEnvironment.java index db432cbc3e..c5d096818d 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/AmsEnvironment.java +++ b/ams/server/src/test/java/com/netease/arctic/server/AmsEnvironment.java @@ -7,17 +7,20 @@ import com.netease.arctic.catalog.ArcticCatalog; import com.netease.arctic.catalog.CatalogLoader; import com.netease.arctic.catalog.CatalogTestHelpers; -import com.netease.arctic.hive.TestHMS; +import com.netease.arctic.hive.HMSMockServer; import com.netease.arctic.optimizer.local.LocalOptimizer; import com.netease.arctic.server.resource.ResourceContainers; import com.netease.arctic.server.table.DefaultTableService; +import com.netease.arctic.server.utils.Configurations; import com.netease.arctic.table.TableIdentifier; import org.apache.commons.io.FileUtils; import org.apache.curator.shaded.com.google.common.io.MoreFiles; import org.apache.curator.shaded.com.google.common.io.RecursiveDeleteOption; import org.apache.iceberg.common.DynFields; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; +import org.junit.rules.TemporaryFolder; import org.kohsuke.args4j.CmdLineException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; public class AmsEnvironment { @@ -39,10 +43,12 @@ public class AmsEnvironment { private final String rootPath; private static final String DEFAULT_ROOT_PATH = "/tmp/arctic_integration"; private static final String OPTIMIZE_GROUP = "default"; - private ArcticServiceContainer arcticService; - private AtomicBoolean amsExit; + private final ArcticServiceContainer arcticService; + private Configurations serviceConfig; + private DefaultTableService tableService; + private final AtomicBoolean amsExit; private int thriftBindPort; - private final TestHMS testHMS; + private final HMSMockServer testHMS; private final Map catalogs = new HashMap<>(); public static final String ICEBERG_CATALOG = "iceberg_catalog"; public static String ICEBERG_CATALOG_DIR = "/iceberg/warehouse"; @@ -54,8 +60,6 @@ public static void main(String[] args) throws Exception { AmsEnvironment amsEnvironment = new AmsEnvironment(); amsEnvironment.start(); amsEnvironment.startOptimizer(); - Thread.sleep(2 * 60 * 1000); - amsEnvironment.stopOptimizer(); } public AmsEnvironment() throws Exception { @@ -71,22 +75,34 @@ public AmsEnvironment(String rootPath) throws Exception { System.setProperty("derby.init.sql.dir", path + "../classes/sql/derby/"); amsExit = new AtomicBoolean(false); arcticService = new ArcticServiceContainer(); - testHMS = new TestHMS(); + TemporaryFolder hiveDir = new TemporaryFolder(); + hiveDir.create(); + testHMS = new HMSMockServer(hiveDir.newFile()); + testHMS.start(); } public void start() throws Exception { startAms(); - DynFields.UnboundField field = - DynFields.builder().hiddenImpl(DefaultTableService.class, "started").build(); - boolean tableServiceIsStart = field.bind(arcticService.getTableService()).get(); + DynFields.UnboundField amsTableServiceField = + DynFields.builder().hiddenImpl(ArcticServiceContainer.class, "tableService").build(); + tableService = amsTableServiceField.bind(arcticService).get(); + DynFields.UnboundField> tableServiceField = + DynFields.builder().hiddenImpl(DefaultTableService.class, "initialized").build(); + boolean tableServiceIsStart = false; long startTime = System.currentTimeMillis(); while (!tableServiceIsStart) { if (System.currentTimeMillis() - startTime > 10000) { throw new RuntimeException("table service not start yet after 10s"); } + try { + tableServiceField.bind(tableService).get().get(); + tableServiceIsStart = true; + } catch (RuntimeException e) { + LOG.info("table service not start yet"); + } Thread.sleep(1000); - tableServiceIsStart = field.bind(arcticService.getTableService()).get(); } + initCatalog(); } @@ -95,6 +111,7 @@ public void stop() throws IOException { if (this.arcticService != null) { this.arcticService.dispose(); } + testHMS.stop(); MoreFiles.deleteRecursively(Paths.get(rootPath), RecursiveDeleteOption.ALLOW_INSECURE); } @@ -103,10 +120,10 @@ public ArcticCatalog catalog(String name) { } public boolean tableExist(TableIdentifier tableIdentifier) { - return arcticService.getTableService().tableExist(tableIdentifier.buildTableIdentifier()); + return tableService.tableExist(tableIdentifier.buildTableIdentifier()); } - public TestHMS getTestHMS() { + public HMSMockServer getTestHMS() { return testHMS; } @@ -123,7 +140,7 @@ private void createIcebergCatalog() { properties.put(CatalogMetaProperties.KEY_WAREHOUSE, warehouseDir); CatalogMeta catalogMeta = CatalogTestHelpers.buildCatalogMeta(ICEBERG_CATALOG, CatalogMetaProperties.CATALOG_TYPE_HADOOP, properties, TableFormat.ICEBERG); - arcticService.getTableService().createCatalog(catalogMeta); + tableService.createCatalog(catalogMeta); catalogs.put(ICEBERG_CATALOG, CatalogLoader.load(getAmsUrl() + "/" + ICEBERG_CATALOG)); } @@ -134,15 +151,15 @@ private void createMixIcebergCatalog() { properties.put(CatalogMetaProperties.KEY_WAREHOUSE, warehouseDir); CatalogMeta catalogMeta = CatalogTestHelpers.buildCatalogMeta(MIXED_ICEBERG_CATALOG, CatalogMetaProperties.CATALOG_TYPE_AMS, properties, TableFormat.MIXED_ICEBERG); - arcticService.getTableService().createCatalog(catalogMeta); + tableService.createCatalog(catalogMeta); catalogs.put(MIXED_ICEBERG_CATALOG, CatalogLoader.load(getAmsUrl() + "/" + MIXED_ICEBERG_CATALOG)); } private void createMixHiveCatalog() { Map properties = Maps.newHashMap(); CatalogMeta catalogMeta = CatalogTestHelpers.buildHiveCatalogMeta(MIXED_HIVE_CATALOG, - properties, testHMS.getHiveConf(), TableFormat.MIXED_HIVE); - arcticService.getTableService().createCatalog(catalogMeta); + properties, testHMS.hiveConf(), TableFormat.MIXED_HIVE); + tableService.createCatalog(catalogMeta); catalogs.put(MIXED_HIVE_CATALOG, CatalogLoader.load(getAmsUrl() + "/" + MIXED_HIVE_CATALOG)); } @@ -167,7 +184,9 @@ public void startOptimizer() { } public void stopOptimizer() { - arcticService.getOptimizingService().listOptimizers() + DynFields.UnboundField field = + DynFields.builder().hiddenImpl(ArcticServiceContainer.class, "optimizingService").build(); + field.bind(arcticService).get().listOptimizers() .forEach(resource -> { ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource); }); @@ -185,8 +204,11 @@ private void startAms() throws Exception { try { LOG.info("start ams"); genThriftBindPort(); - arcticService.setConfig(ArcticManagementConf.THRIFT_BIND_PORT, thriftBindPort); - arcticService.setConfig(ArcticManagementConf.EXTERNAL_CATALOG_REFRESH_INTERVAL, 5 * 1000L); + DynFields.UnboundField field = + DynFields.builder().hiddenImpl(ArcticServiceContainer.class, "serviceConfig").build(); + serviceConfig = field.bind(arcticService).get(); + serviceConfig.set(ArcticManagementConf.THRIFT_BIND_PORT, thriftBindPort); + serviceConfig.set(ArcticManagementConf.EXTERNAL_CATALOG_REFRESH_INTERVAL, 1000L); // when AMS is successfully running, this thread will wait here arcticService.startService(); break; @@ -213,12 +235,15 @@ private void startAms() throws Exception { }, "ams-runner"); amsRunner.start(); + DynFields.UnboundField amsServerField = + DynFields.builder().hiddenImpl(ArcticServiceContainer.class, "server").build(); while (true) { if (amsExit.get()) { LOG.error("ams exit"); break; } - if (arcticService.isStarted()) { + TServer thriftServer = amsServerField.bind(arcticService).get(); + if (thriftServer != null && thriftServer.isServing()) { LOG.info("ams start"); break; } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/AbstractOptimizingTest.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/AbstractOptimizingTest.java index ba3b25463b..dcb82d4d49 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/AbstractOptimizingTest.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/AbstractOptimizingTest.java @@ -28,22 +28,23 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Multimap; -import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Pair; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +60,6 @@ import java.util.List; import java.util.Set; -import static com.netease.arctic.IcebergTableTestHelper.writeEqDeleteFile; -import static com.netease.arctic.IcebergTableTestHelper.writeNewDataFile; -import static com.netease.arctic.IcebergTableTestHelper.writePosDeleteFile; - public abstract class AbstractOptimizingTest { private static final Logger LOG = LoggerFactory.getLogger(AbstractOptimizingTest.class); @@ -85,13 +82,19 @@ protected static Record newRecord(Schema schema, Object... val) { protected static DataFile insertDataFile(Table table, List records, StructLike partitionData) throws IOException { - DataFile result = writeNewDataFile(table, records, partitionData); + OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 0, 1) + .format(FileFormat.PARQUET).build(); + DataFile dataFile = FileHelpers.writeDataFile( + table, + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records); AppendFiles baseAppend = table.newAppend(); - baseAppend.appendFile(result); + baseAppend.appendFile(dataFile); baseAppend.commit(); - return result; + return dataFile; } protected static long getDataFileSize(Table table) { @@ -120,9 +123,24 @@ protected static void updateProperties(ArcticTable table, String key, String val protected static void rowDelta(Table table, List insertRecords, List deleteRecords, StructLike partitionData) throws IOException { - DataFile dataFile = writeNewDataFile(table, insertRecords, partitionData); - - DeleteFile deleteFile = writeEqDeleteFile(table, deleteRecords, partitionData); + // DataFile dataFile = writeNewDataFile(table, insertRecords, partitionData); + OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 0, 1) + .format(FileFormat.PARQUET).build(); + DataFile dataFile = FileHelpers.writeDataFile( + table, + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + insertRecords); + + Schema eqDeleteRowSchema = table.schema().select("id"); + DeleteFile deleteFile = FileHelpers.writeDeleteFile( + table, + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + deleteRecords, + eqDeleteRowSchema); + + // DeleteFile deleteFile = writeEqDeleteFile(table, deleteRecords, partitionData); RowDelta rowDelta = table.newRowDelta(); rowDelta.addRows(dataFile); rowDelta.addDeletes(deleteFile); @@ -132,23 +150,50 @@ protected static void rowDelta(Table table, List insertRecords, List records, StructLike partitionData) throws IOException { - DeleteFile result = writeEqDeleteFile(table, records, partitionData); + Schema eqDeleteRowSchema = table.schema().select("id"); + OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 0, 1) + .format(FileFormat.PARQUET).build(); + DeleteFile deleteFile = FileHelpers.writeDeleteFile( + table, + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records, + eqDeleteRowSchema); + // DeleteFile result = writeEqDeleteFile(table, records, partitionData); RowDelta rowDelta = table.newRowDelta(); - rowDelta.addDeletes(result); + rowDelta.addDeletes(deleteFile); rowDelta.commit(); - return result; + return deleteFile; } protected static void rowDeltaWithPos(Table table, List insertRecords, List deleteRecords, StructLike partitionData) throws IOException { - DataFile dataFile = writeNewDataFile(table, insertRecords, partitionData); - - DeleteFile deleteFile = writeEqDeleteFile(table, deleteRecords, partitionData); - Multimap - file2Positions = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); - file2Positions.put(dataFile.path().toString(), 0L); - DeleteFile posDeleteFile = writePosDeleteFile(table, file2Positions, partitionData); + OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 0, 1) + .format(FileFormat.PARQUET).build(); + + DataFile dataFile = FileHelpers.writeDataFile( + table, + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + insertRecords); + + Schema eqDeleteRowSchema = table.schema().select("id"); + DeleteFile deleteFile = FileHelpers.writeDeleteFile( + table, + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + deleteRecords, + eqDeleteRowSchema); + + List> file2Positions = Lists.newArrayList(); + file2Positions.add(Pair.of(dataFile.path().toString(), 0L)); + + DeleteFile posDeleteFile = FileHelpers.writeDeleteFile( + table, + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + file2Positions).first(); RowDelta rowDelta = table.newRowDelta(); rowDelta.addRows(dataFile); rowDelta.addDeletes(deleteFile); diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/BaseOptimizingChecker.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/BaseOptimizingChecker.java index dd3fe272c8..97074cb9f1 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/BaseOptimizingChecker.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/BaseOptimizingChecker.java @@ -17,6 +17,7 @@ import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import java.util.stream.Collectors; public class BaseOptimizingChecker extends PersistentBase { @@ -93,7 +94,10 @@ protected TableOptimizingProcess waitOptimizeResult() { return Status.RUNNING; } Optional any = - tableOptimizingProcesses.stream().filter(p -> p.getProcessId() > lastProcessId).findAny(); + tableOptimizingProcesses.stream() + .filter(p -> p.getProcessId() > lastProcessId) + .filter(p -> p.getStatus().equals(OptimizingProcess.Status.SUCCESS.name())) + .findAny(); if (any.isPresent()) { return Status.SUCCESS; @@ -112,17 +116,17 @@ protected TableOptimizingProcess waitOptimizeResult() { } if (success) { - Optional result = getAs( + List result = getAs( OptimizingMapper.class, mapper -> mapper.selectSuccessOptimizingProcesses(tableIdentifier.getCatalog(), tableIdentifier.getDatabase(), tableIdentifier.getTableName())).stream() - .filter(p -> p.getProcessId() > lastProcessId) - .findAny(); - if (result.isPresent()) { - this.lastProcessId = result.get().getProcessId(); - return result.get(); + .filter(p -> p.getProcessId() > lastProcessId).collect(Collectors.toList()); + result.sort(Comparator.comparingLong(TableOptimizingProcess::getProcessId).reversed()); + if (result.size() == 1) { + this.lastProcessId = result.get(0).getProcessId(); + return result.get(0); } else { - return null; + throw new RuntimeException("optimize result size " + result.size()); } } else { return null; @@ -138,14 +142,9 @@ protected void assertOptimizeHangUp() { List tableOptimizingProcesses = getAs( OptimizingMapper.class, mapper -> mapper.selectSuccessOptimizingProcesses(tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), tableIdentifier.getTableName())); - if (tableOptimizingProcesses == null || tableOptimizingProcesses.isEmpty()) { - return; - } - Optional any = - tableOptimizingProcesses.stream().filter(p -> p.getProcessId() > lastProcessId).findAny(); - any.ifPresent(h -> LOG.error("{} get unexpected optimize process {} {}", tableIdentifier, lastProcessId, h)); - Assert.assertFalse("optimize is not stopped", any.isPresent()); + tableIdentifier.getDatabase(), tableIdentifier.getTableName())).stream() + .filter(p -> p.getProcessId() > lastProcessId).collect(Collectors.toList()); + Assert.assertFalse("optimize is not stopped", tableOptimizingProcesses.size() > 0); } protected boolean waitUntilFinish(Supplier statusSupplier, final long timeout) diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestIcebergHadoopOptimizing.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestIcebergHadoopOptimizing.java index 9268778982..cf3385a1b4 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestIcebergHadoopOptimizing.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestIcebergHadoopOptimizing.java @@ -18,17 +18,24 @@ package com.netease.arctic.server.optimizing; +import com.netease.arctic.iceberg.InternalRecordWrapper; import com.netease.arctic.server.dashboard.model.TableOptimizingProcess; import com.netease.arctic.table.TableIdentifier; import com.netease.arctic.table.TableProperties; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import java.io.IOException; - -import static com.netease.arctic.IcebergTableTestHelper.partitionData; +import java.util.List; +import java.util.Set; public class TestIcebergHadoopOptimizing extends AbstractOptimizingTest { private final Table table; @@ -85,7 +92,7 @@ public void testIcebergTableOptimizing() throws IOException { // wait Minor Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 3, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 1); assertIds(readRecords(table), 4, 5, 6); updateProperties(table, TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT, "10"); @@ -103,7 +110,7 @@ public void testIcebergTableOptimizing() throws IOException { // wait FullMajor Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL_MAJOR, 4, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 4, 1); assertIds(readRecords(table), 5, 6, 7, 8); checker.assertOptimizeHangUp(); @@ -189,7 +196,7 @@ public void testPartitionIcebergTableOptimizing() throws IOException { // wait Minor Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 3, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 1); assertIds(readRecords(table), 4, 5, 6); updateProperties(table, TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT, "10"); @@ -203,7 +210,7 @@ public void testPartitionIcebergTableOptimizing() throws IOException { // wait FullMajor Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL_MAJOR, 4, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 4, 1); assertIds(readRecords(table), 5, 6, 7, 8); @@ -273,8 +280,6 @@ public void testIcebergTableFullOptimize() throws IOException { } public void testPartitionIcebergTablePartialOptimizing() throws IOException { - updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "false"); - // Step 1: insert 6 data files for two partitions StructLike partitionData1 = partitionData(table.schema(), table.spec(), quickDateWithZone(1)); insertDataFile(table, Lists.newArrayList( @@ -300,13 +305,10 @@ public void testPartitionIcebergTablePartialOptimizing() throws IOException { updateProperties(table, TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT, "2"); updateProperties(table, TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT, "4"); - updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "true"); // wait Minor Optimize result TableOptimizingProcess optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4, 2); - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6, 1); assertIds(readRecords(table), 1, 2, 3, 4, 5, 6); checker.assertOptimizeHangUp(); @@ -315,4 +317,29 @@ public void testPartitionIcebergTablePartialOptimizing() throws IOException { private Record newRecord(Object... val) { return newRecord(table.schema(), val); } + + private StructLike partitionData(Schema tableSchema, PartitionSpec spec, Object... partitionValues) { + GenericRecord record = GenericRecord.create(tableSchema); + int index = 0; + Set partitionField = Sets.newHashSet(); + spec.fields().forEach(f -> partitionField.add(f.sourceId())); + List tableFields = tableSchema.columns(); + for (int i = 0; i < tableFields.size(); i++) { + // String sourceColumnName = tableSchema.findColumnName(i); + Types.NestedField sourceColumn = tableFields.get(i); + if (partitionField.contains(sourceColumn.fieldId())) { + Object partitionVal = partitionValues[index]; + index++; + record.set(i, partitionVal); + } else { + record.set(i, 0); + } + } + + PartitionKey pd = new PartitionKey(spec, tableSchema); + InternalRecordWrapper wrapper = new InternalRecordWrapper(tableSchema.asStruct()); + wrapper = wrapper.wrap(record); + pd.partition(wrapper); + return pd; + } } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestMixedHiveOptimizing.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestMixedHiveOptimizing.java index 397aac51c7..4612d15e67 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestMixedHiveOptimizing.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestMixedHiveOptimizing.java @@ -58,7 +58,7 @@ public void testHiveKeyedTableMajorOptimizeNotMove() throws TException, IOExcept checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL_MAJOR, 1, 1); assertIdRange(readRecords(table), 1, 100); // assert file are in hive location - assertIdRange(readHiveTableData(), 1, 100); + // assertIdRange(readHiveTableData(), 1, 100); // Step2: write 1 change delete record writeChange(table, null, Lists.newArrayList( @@ -68,26 +68,18 @@ public void testHiveKeyedTableMajorOptimizeNotMove() throws TException, IOExcept optimizeHistory = checker.waitOptimizeResult(); checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 1); assertIdRange(readRecords(table), 2, 100); - assertIdRange(readHiveTableData(), 1, 100); + // assertIdRange(readHiveTableData(), 1, 100); // Step3: write 2 small files to base writeBase(table, rangeFromTo(101, 102, "aaa", quickDateWithZone(3))); // should not optimize with 1 small file - checker.assertOptimizeHangUp(); + optimizeHistory = checker.waitOptimizeResult(); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 1); writeBase(table, rangeFromTo(103, 104, "aaa", quickDateWithZone(3))); // wait Major Optimize result, generate 1 data file from 2 small files, but not move to hive location optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 2, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 3, 1); assertIdRange(readRecords(table), 2, 104); - assertIdRange(readHiveTableData(), 1, 100); - - // Step3: change Full optimize trigger condition - updateProperties(table, TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO, "0.0"); - // wait Full Optimize result, generate 1 data file from 2 small files, but not move to hive location - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL_MAJOR, 3, 1); - assertIdRange(readRecords(table), 2, 104); - assertIdRange(readHiveTableData(), 2, 104); checker.assertOptimizeHangUp(); } @@ -102,15 +94,15 @@ public void testHiveKeyedTableMajorOptimizeAndMove() throws TException, IOExcept checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL_MAJOR, 1, 1); assertIdRange(readRecords(table), 1, 100); // assert file are in hive location - assertIdRange(readHiveTableData(), 1, 100); + // assertIdRange(readHiveTableData(), 1, 100); // Step2: write 1 small file to base writeBase(table, rangeFromTo(101, 102, "aaa", quickDateWithZone(3))); // wait Major Optimize result, generate 1 data file from 2 small files, but not move to hive location optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 1, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 1); assertIdRange(readRecords(table), 1, 102); - assertIdRange(readHiveTableData(), 1, 102); + // assertIdRange(readHiveTableData(), 1, 102); checker.assertOptimizeHangUp(); } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestMixedIcebergOptimizing.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestMixedIcebergOptimizing.java index c814486ee4..ef455d2fa8 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestMixedIcebergOptimizing.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestMixedIcebergOptimizing.java @@ -57,7 +57,7 @@ public void testKeyedTableContinueOptimizing() { // wait Minor Optimize result, no major optimize because there is only 1 base file for each node TableOptimizingProcess optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4, 4); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4, 1); assertIds(readRecords(table), 3, 4, 5, 6); // Step2: insert change data @@ -70,9 +70,7 @@ public void testKeyedTableContinueOptimizing() { // wait Minor/Major Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 8, 4); - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 8, 4); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 8, 1); assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10); // Step3: delete change data @@ -80,10 +78,6 @@ public void testKeyedTableContinueOptimizing() { newRecord(7, "fff", quickDateWithZone(3)), newRecord(8, "ggg", quickDateWithZone(3)) )); - // wait Minor/Major Optimize result - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4, 2); - assertIds(readRecords(table), 3, 4, 5, 6, 9, 10); // Step4: update change data writeChange(table, Lists.newArrayList( @@ -95,9 +89,11 @@ public void testKeyedTableContinueOptimizing() { )); // wait Minor/Major Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 6, 4); - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 6, 2); + if (arcticTable.spec().isPartitioned()) { + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6, 1); + } else { + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 10, 1); + } assertIds(readRecords(table), 3, 4, 5, 6, 9, 10); assertNames(readRecords(table), "aaa", "bbb", "eee", "ddd", "hhh_new", "iii_new"); @@ -112,7 +108,11 @@ public void testKeyedTableContinueOptimizing() { )); // wait Minor/Major Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 10, 4); + if (arcticTable.spec().isPartitioned()) { + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6, 0); + } else { + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 8, 0); + } assertIds(readRecords(table)); // Step6: insert change data @@ -121,9 +121,7 @@ public void testKeyedTableContinueOptimizing() { ), null); // wait Minor Optimize result, no major optimize because there is only 1 base file for each node optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 3, 1); - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 3, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 1, 1); assertIds(readRecords(table), 11); checker.assertOptimizeHangUp(); } @@ -149,12 +147,30 @@ public void testPkTableMajorOptimizeLeftPosDelete() { newRecord(10, "ggg" + longString, quickDateWithZone(4)), newRecord(14, "hhh" + longString, quickDateWithZone(4)) )); + writeBase(table, Lists.newArrayList( + newRecord(3, "eee" + longString, quickDateWithZone(3)), + newRecord(7, "fff" + longString, quickDateWithZone(3)), + newRecord(11, "ggg" + longString, quickDateWithZone(4)), + newRecord(15, "hhh" + longString, quickDateWithZone(4)) + )); + writeBase(table, Lists.newArrayList( + newRecord(4, "eee" + longString, quickDateWithZone(3)), + newRecord(8, "fff" + longString, quickDateWithZone(3)), + newRecord(12, "ggg" + longString, quickDateWithZone(4)), + newRecord(16, "hhh" + longString, quickDateWithZone(4)) + )); + writeBase(table, Lists.newArrayList( + newRecord(17, "eee" + longString, quickDateWithZone(3)), + newRecord(21, "fff" + longString, quickDateWithZone(3)), + newRecord(25, "ggg" + longString, quickDateWithZone(4)), + newRecord(29, "hhh" + longString, quickDateWithZone(4)) + )); updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "true"); updateProperties(table, TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, "1000"); TableOptimizingProcess optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL_MAJOR, 2, 2); - assertIds(readRecords(table), 1, 5, 9, 13, 2, 6, 10, 14); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL_MAJOR, 5, 4); + assertIds(readRecords(table), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 21, 25, 29); updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "false"); updateProperties(table, TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, "-1"); @@ -179,17 +195,14 @@ public void testPkTableMajorOptimizeLeftPosDelete() { newRecord(1, "aaa_new2", quickDateWithZone(3)) )); - assertIds(readRecords(table), 5, 9, 13, 2, 6, 10, 14); + assertIds(readRecords(table), 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 21, 25, 29); updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "true"); // wait Minor Optimize result, no major optimize because there is only 1 base file for each node optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6, 3); - - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 3, 0); - assertIds(readRecords(table), 5, 9, 13, 2, 6, 10, 14); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 10, 4); + assertIds(readRecords(table), 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 21, 25, 29); checker.assertOptimizeHangUp(); } @@ -214,7 +227,7 @@ public void testNoPkPartitionTableOptimizing() { )); // wait Major Optimize result TableOptimizingProcess optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 4, 2); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4, 1); assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10); // Step 3: insert data @@ -226,7 +239,7 @@ public void testNoPkPartitionTableOptimizing() { )); // wait Major Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 4, 2); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4, 1); assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14); checker.assertOptimizeHangUp(); @@ -252,7 +265,7 @@ public void testNoPkTableOptimizing() { )); // wait Major Optimize result TableOptimizingProcess optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 2, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 1); assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10); // Step 3: insert data @@ -264,7 +277,7 @@ public void testNoPkTableOptimizing() { )); // wait Major Optimize result optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 2, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 1); assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14); checker.assertOptimizeHangUp(); @@ -312,9 +325,7 @@ public void testKeyedTableTxIdNotInOrder() { TableOptimizingProcess optimizeHistory = checker.waitOptimizeResult(); checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 1, 1); optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6, 5); - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 6, 1); + checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6, 1); table.refresh(); assertIds(readRecords(table), 1, 2, 3); @@ -323,40 +334,6 @@ public void testKeyedTableTxIdNotInOrder() { checker.assertOptimizeHangUp(); } - public void testPartitionArcticTablePartialOptimizing() { - KeyedTable table = arcticTable.asKeyedTable(); - emptyCommit(table); - emptyCommit(table); - emptyCommit(table); - emptyCommit(table); - - // Step1: insert base data - List dataFiles = writeBase(table, Lists.newArrayList( - newRecord(1, "aaa", quickDateWithZone(3)), - newRecord(2, "bbb", quickDateWithZone(4)), - newRecord(3, "ccc", quickDateWithZone(5)) - )); - - long maxFileSizeLimit = dataFiles.get(0).fileSizeInBytes() * 4 - 100; - updateProperties(table, TableProperties.SELF_OPTIMIZING_MAX_FILE_SIZE_BYTES, maxFileSizeLimit + ""); - - // Step2: insert base data - writeBase(table, Lists.newArrayList( - newRecord(1, "aaa", quickDateWithZone(3)), - newRecord(2, "bbb", quickDateWithZone(4)), - newRecord(3, "ccc", quickDateWithZone(5)) - )); - - // wait Major Optimize result - TableOptimizingProcess optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 4, 2); - optimizeHistory = checker.waitOptimizeResult(); - checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MAJOR, 2, 1); - assertIds(readRecords(table), 1, 1, 2, 2, 3, 3); - - checker.assertOptimizeHangUp(); - } - private Record newRecord(Object... val) { return newRecord(arcticTable.schema(), val); } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestOptimizingIntegration.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestOptimizingIntegration.java index 2c2c69a96f..83bbea8cb3 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestOptimizingIntegration.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/TestOptimizingIntegration.java @@ -60,6 +60,8 @@ public class TestOptimizingIntegration { TableIdentifier.of(AmsEnvironment.MIXED_ICEBERG_CATALOG, DATABASE, "mix_iceberg_table5"); private static final TableIdentifier MIXED_ICEBERG_TB_6 = TableIdentifier.of(AmsEnvironment.MIXED_ICEBERG_CATALOG, DATABASE, "mix_iceberg_table6"); + private static final TableIdentifier MIXED_ICEBERG_TB_7 = + TableIdentifier.of(AmsEnvironment.MIXED_ICEBERG_CATALOG, DATABASE, "mix_iceberg_table7"); private static final TableIdentifier MIXED_HIVE_TB_1 = TableIdentifier.of(AmsEnvironment.MIXED_HIVE_CATALOG, DATABASE, "mix_hive_table1"); private static final TableIdentifier MIXED_HIVE_TB_2 = TableIdentifier.of(AmsEnvironment.MIXED_HIVE_CATALOG, @@ -80,6 +82,9 @@ public static void before() throws Exception { amsEnv = new AmsEnvironment(rootPath); amsEnv.start(); amsEnv.startOptimizer(); + amsEnv.catalog(AmsEnvironment.ICEBERG_CATALOG).createDatabase(DATABASE); + amsEnv.catalog(AmsEnvironment.MIXED_ICEBERG_CATALOG).createDatabase(DATABASE); + amsEnv.catalog(AmsEnvironment.MIXED_HIVE_CATALOG).createDatabase(DATABASE); } @AfterAll @@ -176,21 +181,13 @@ public void testKeyedTableTxIdNotInOrder() { testCase.testKeyedTableTxIdNotInOrder(); } - @Test - public void testPartitionArcticTablePartialOptimizing() { - ArcticTable arcticTable = createArcticTable(MIXED_ICEBERG_TB_6, PRIMARY_KEY, SPEC); - assertTableExist(MIXED_ICEBERG_TB_6); - TestMixedIcebergOptimizing testCase = new TestMixedIcebergOptimizing(arcticTable); - testCase.testPartitionArcticTablePartialOptimizing(); - } - @Test public void testHiveKeyedTableMajorOptimizeNotMove() throws TException, IOException { createHiveArcticTable(MIXED_HIVE_TB_1, PRIMARY_KEY, PartitionSpec.unpartitioned()); assertTableExist(MIXED_HIVE_TB_1); KeyedTable table = amsEnv.catalog(AmsEnvironment.MIXED_HIVE_CATALOG).loadTable(MIXED_HIVE_TB_1).asKeyedTable(); TestMixedHiveOptimizing testCase = - new TestMixedHiveOptimizing(table, amsEnv.getTestHMS().getHiveClient()); + new TestMixedHiveOptimizing(table, amsEnv.getTestHMS().getClient()); testCase.testHiveKeyedTableMajorOptimizeNotMove(); } @@ -200,7 +197,7 @@ public void testHiveKeyedTableMajorOptimizeAndMove() throws TException, IOExcept assertTableExist(MIXED_HIVE_TB_2); KeyedTable table = amsEnv.catalog(AmsEnvironment.MIXED_HIVE_CATALOG).loadTable(MIXED_HIVE_TB_2).asKeyedTable(); TestMixedHiveOptimizing testCase = - new TestMixedHiveOptimizing(table, amsEnv.getTestHMS().getHiveClient()); + new TestMixedHiveOptimizing(table, amsEnv.getTestHMS().getClient()); testCase.testHiveKeyedTableMajorOptimizeAndMove(); } diff --git a/core/src/test/java/com/netease/arctic/IcebergTableTestHelper.java b/core/src/test/java/com/netease/arctic/IcebergTableTestHelper.java deleted file mode 100644 index 4bdcd66aa8..0000000000 --- a/core/src/test/java/com/netease/arctic/IcebergTableTestHelper.java +++ /dev/null @@ -1,138 +0,0 @@ -package com.netease.arctic; - -import com.netease.arctic.iceberg.InternalRecordWrapper; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.RowDelta; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; -import org.apache.iceberg.data.GenericAppenderFactory; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.deletes.EqualityDeleteWriter; -import org.apache.iceberg.deletes.PositionDelete; -import org.apache.iceberg.deletes.PositionDeleteWriter; -import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Multimap; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ArrayUtil; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class IcebergTableTestHelper { - - public static DataFile writeNewDataFile(Table table, List records, StructLike partitionData) - throws IOException { - GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()); - OutputFileFactory outputFileFactory = - OutputFileFactory.builderFor(table, table.spec().specId(), 1) - .build(); - EncryptedOutputFile outputFile = outputFileFactory.newOutputFile(); - - DataWriter writer = appenderFactory - .newDataWriter(outputFile, FileFormat.PARQUET, partitionData); - - for (Record record : records) { - writer.write(record); - } - writer.close(); - return writer.toDataFile(); - } - - public static DeleteFile writeEqDeleteFile(Table table, List records, StructLike partitionData) - throws IOException { - List equalityFieldIds = Lists.newArrayList(table.schema().findField("id").fieldId()); - Schema eqDeleteRowSchema = table.schema().select("id"); - GenericAppenderFactory appenderFactory = - new GenericAppenderFactory(table.schema(), table.spec(), - ArrayUtil.toIntArray(equalityFieldIds), eqDeleteRowSchema, null); - OutputFileFactory outputFileFactory = - OutputFileFactory.builderFor(table, table.spec().specId(), 1) - .build(); - EncryptedOutputFile outputFile = outputFileFactory.newOutputFile(table.spec(), partitionData); - - EqualityDeleteWriter writer = appenderFactory - .newEqDeleteWriter(outputFile, FileFormat.PARQUET, partitionData); - - for (Record record : records) { - writer.write(record); - } - writer.close(); - return writer.toDeleteFile(); - } - - public static DeleteFile writePosDeleteFile( - Table table, Multimap file2Positions, - StructLike partitionData) throws IOException { - GenericAppenderFactory appenderFactory = - new GenericAppenderFactory(table.schema(), table.spec()); - OutputFileFactory outputFileFactory = - OutputFileFactory.builderFor(table, table.spec().specId(), 1) - .build(); - EncryptedOutputFile outputFile = outputFileFactory.newOutputFile(table.spec(), partitionData); - - PositionDeleteWriter writer = appenderFactory - .newPosDeleteWriter(outputFile, FileFormat.PARQUET, partitionData); - for (Map.Entry> entry : file2Positions.asMap().entrySet()) { - String filePath = entry.getKey(); - Collection positions = entry.getValue(); - for (Long position : positions) { - PositionDelete positionDelete = PositionDelete.create(); - positionDelete.set(filePath, position, null); - writer.write(positionDelete); - } - } - writer.close(); - return writer.toDeleteFile(); - } - - public static void rowDelta(Table table, List insertRecords, List deleteRecords, - StructLike partitionData) - throws IOException { - DataFile dataFile = writeNewDataFile(table, insertRecords, partitionData); - - DeleteFile deleteFile = writeEqDeleteFile(table, deleteRecords, partitionData); - RowDelta rowDelta = table.newRowDelta(); - rowDelta.addRows(dataFile); - rowDelta.addDeletes(deleteFile); - rowDelta.validateFromSnapshot(table.currentSnapshot().snapshotId()); - rowDelta.commit(); - } - - public static StructLike partitionData(Schema tableSchema, PartitionSpec spec, Object... partitionValues) { - GenericRecord record = GenericRecord.create(tableSchema); - int index = 0; - Set partitionField = Sets.newHashSet(); - spec.fields().forEach(f -> partitionField.add(f.sourceId())); - List tableFields = tableSchema.columns(); - for (int i = 0; i < tableFields.size(); i++) { - // String sourceColumnName = tableSchema.findColumnName(i); - Types.NestedField sourceColumn = tableFields.get(i); - if (partitionField.contains(sourceColumn.fieldId())) { - Object partitionVal = partitionValues[index]; - index++; - record.set(i, partitionVal); - } else { - record.set(i, 0); - } - } - - PartitionKey pd = new PartitionKey(spec, tableSchema); - InternalRecordWrapper wrapper = new InternalRecordWrapper(tableSchema.asStruct()); - wrapper = wrapper.wrap(record); - pd.partition(wrapper); - return pd; - } -}