From 2c7c3026edcb5bffb9bcc39dfb2169bb37b86faf Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Tue, 30 Jul 2024 12:40:15 +0800 Subject: [PATCH 1/8] [Spark]: Relocate the 'path' property in the table options to 'location' for better presentation. --- .../org/apache/paimon/spark/SparkTable.scala | 2 ++ .../apache/paimon/spark/SparkReadITCase.java | 18 ++++++++---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala index 902619f05ba1..d269a4012ab3 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala @@ -60,6 +60,8 @@ case class SparkTable(table: Table) properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",", table.primaryKeys)) } properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME) + val location = properties.remove(CoreOptions.PATH.key()) + properties.put(TableCatalog.PROP_LOCATION, location) if (table.comment.isPresent) { properties.put(TableCatalog.PROP_COMMENT, table.comment.get) } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index be6264f7b2d0..744801aadf37 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -193,11 +193,7 @@ public void testCreateTableAs() { assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString()) .isEqualTo( String.format( - "[[%s" - + "PARTITIONED BY (a)\n" - + "TBLPROPERTIES (\n" - + " 'path' = '%s')\n" - + "]]", + "[[%s" + "PARTITIONED BY (a)\n" + "LOCATION '%s'\n" + "]]", showCreateString( "partitionedTableAs", "a BIGINT", "b STRING", "c STRING"), new Path(warehousePath, "default.db/partitionedTableAs"))); @@ -221,9 +217,9 @@ public void testCreateTableAs() { .isEqualTo( String.format( "[[%s" + + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" - + " 'file.format' = 'parquet',\n" - + " 'path' = '%s')\n" + + " 'file.format' = 'parquet')\n" + "]]", showCreateString( "testTableAs", "a BIGINT", "b VARCHAR(10)", "c CHAR(10)"), @@ -248,7 +244,9 @@ public void testCreateTableAs() { assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString()) .isEqualTo( String.format( - "[[%sTBLPROPERTIES (\n 'path' = '%s',\n 'primary-key' = 'a')\n]]", + "[[%s" + + "LOCATION '%s'\n" + + "TBLPROPERTIES (\n 'primary-key' = 'a')\n]]", showCreateString( "t_pk_as", "a BIGINT NOT NULL", "b STRING", "c STRING"), new Path(warehousePath, "default.db/t_pk_as"))); @@ -275,8 +273,8 @@ public void testCreateTableAs() { String.format( "[[%s" + "PARTITIONED BY (dt)\n" + + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" - + " 'path' = '%s',\n" + " 'primary-key' = 'dt,hh')\n" + "]]", showCreateString( @@ -369,9 +367,9 @@ public void testShowCreateTable() { "[[%s" + "PARTITIONED BY (b)\n" + "COMMENT 'tbl comment'\n" + + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" + " 'k1' = 'v1',\n" - + " 'path' = '%s',\n" + " 'primary-key' = 'a,b')\n]]", showCreateString( "tbl", From 5219a94c2550663d07e32b7c076fd67c9996b485 Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Thu, 1 Aug 2024 20:29:05 +0800 Subject: [PATCH 2/8] Don't remove 'path' --- .../src/main/scala/org/apache/paimon/spark/SparkTable.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala index d269a4012ab3..7f179c1d541f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala @@ -60,8 +60,7 @@ case class SparkTable(table: Table) properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",", table.primaryKeys)) } properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME) - val location = properties.remove(CoreOptions.PATH.key()) - properties.put(TableCatalog.PROP_LOCATION, location) + properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key())) if (table.comment.isPresent) { properties.put(TableCatalog.PROP_COMMENT, table.comment.get) } From 0bb5889264403a5935aa4e95fb646181428186bf Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Sun, 4 Aug 2024 17:38:53 +0800 Subject: [PATCH 3/8] Support create table with user defined location. --- .../paimon/catalog/FileSystemCatalog.java | 5 +++ .../org/apache/paimon/jdbc/JdbcCatalog.java | 5 +++ .../org/apache/paimon/hive/HiveCatalog.java | 34 ++++++++++++++---- .../apache/paimon/hive/HiveCatalogTest.java | 36 +++++++++++++++++++ .../org/apache/paimon/spark/SparkCatalog.java | 5 +++ 5 files changed, 79 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 14b4d171835c..1407284dab85 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.Lock; @@ -36,6 +37,7 @@ import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { @@ -117,6 +119,9 @@ protected void dropTableImpl(Identifier identifier) { @Override public void createTableImpl(Identifier identifier, Schema schema) { + checkArgument( + !schema.options().containsKey(CoreOptions.PATH.key()), + "The FileSystemCatalog does not support specifying location when creating a table."); uncheck(() -> schemaManager(identifier).createTable(schema)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index bb4f61c638cf..0dc5d445b4ee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogLockContext; @@ -57,6 +58,7 @@ import static org.apache.paimon.jdbc.JdbcUtils.execute; import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; import static org.apache.paimon.jdbc.JdbcUtils.updateTable; +import static org.apache.paimon.utils.Preconditions.checkArgument; /* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -235,6 +237,9 @@ protected void dropTableImpl(Identifier identifier) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { + checkArgument( + !schema.options().containsKey(CoreOptions.PATH.key()), + "The FileSystemCatalog does not support specifying location when creating a table."); try { // create table file getSchemaManager(identifier).createTable(schema); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index c5473446bdf9..2d39e26b23a1 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -78,6 +78,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -493,7 +494,18 @@ protected void createTableImpl(Identifier identifier, Schema schema) { // if changes on Hive fails there is no harm to perform the same changes to files again TableSchema tableSchema; try { - tableSchema = schemaManager(identifier).createTable(schema, usingExternalTable()); + Path tableRoot; + if (schema.options().containsKey(CoreOptions.PATH.key())) { + checkArgument( + Objects.equals(createTableType(), TableType.EXTERNAL), + "The HiveCatalog only supports specifying location when creating an external table"); + tableRoot = new Path(schema.options().get(CoreOptions.PATH.key())); + } else { + tableRoot = getTableLocation(identifier); + } + + tableSchema = + schemaManager(identifier, tableRoot).createTable(schema, usingExternalTable()); } catch (Exception e) { throw new RuntimeException( "Failed to commit changes of table " @@ -707,10 +719,7 @@ public String warehouse() { private Table newHmsTable(Identifier identifier, Map tableParameters) { long currentTimeMillis = System.currentTimeMillis(); - TableType tableType = - OptionsUtils.convertToEnum( - hiveConf.get(TABLE_TYPE.key(), TableType.MANAGED.toString()), - TableType.class); + TableType tableType = createTableType(); Table table = new Table( identifier.getTableName(), @@ -735,6 +744,11 @@ private Table newHmsTable(Identifier identifier, Map tableParame return table; } + private TableType createTableType() { + return OptionsUtils.convertToEnum( + hiveConf.get(TABLE_TYPE.key(), TableType.MANAGED.toString()), TableType.class); + } + private void updateHmsTable(Table table, Identifier identifier, TableSchema schema) { StorageDescriptor sd = table.getSd() != null ? table.getSd() : new StorageDescriptor(); @@ -792,7 +806,11 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche } // update location - locationHelper.specifyTableLocation(table, getTableLocation(identifier).toString()); + String location = + schema.options().containsKey(CoreOptions.PATH.key()) + ? schema.options().get(CoreOptions.PATH.key()) + : getTableLocation(identifier).toString(); + locationHelper.specifyTableLocation(table, location); } private void updateHmsTablePars(Table table, TableSchema schema) { @@ -816,6 +834,10 @@ private FieldSchema convertToFieldSchema(DataField dataField) { } private SchemaManager schemaManager(Identifier identifier) { + return schemaManager(identifier, getTableLocation(identifier)); + } + + private SchemaManager schemaManager(Identifier identifier, Path path) { return new SchemaManager( fileIO, getTableLocation(identifier), identifier.getBranchNameOrDefault()) .withLock(lock(identifier)); diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index 6b13a80e801a..9459d3ca2ae3 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -18,9 +18,11 @@ package org.apache.paimon.hive; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -29,6 +31,7 @@ import org.apache.paimon.utils.CommonTestUtils; import org.apache.paimon.utils.HadoopUtils; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.hadoop.hive.conf.HiveConf; @@ -37,6 +40,7 @@ import org.apache.thrift.TException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.lang.reflect.Field; import java.util.Arrays; @@ -268,4 +272,36 @@ public void testAlterHiveTableParameters() { fail("Test failed due to exception: " + e.getMessage()); } } + + @Test + public void testCreateExternalTableWithLocation(@TempDir java.nio.file.Path tempDir) + throws Exception { + HiveConf hiveConf = new HiveConf(); + String jdoConnectionURL = "jdbc:derby:memory:" + UUID.randomUUID(); + hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true"); + hiveConf.set(CatalogOptions.TABLE_TYPE.key(), "external"); + String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; + HiveCatalog externalWarehouseCatalog = + new HiveCatalog(fileIO, hiveConf, metastoreClientClass, warehouse); + + String externalTablePath = tempDir.toString(); + + Schema schema = + new Schema( + Lists.newArrayList(new DataField(0, "foo", DataTypes.INT())), + Collections.emptyList(), + Collections.emptyList(), + ImmutableMap.of("path", externalTablePath), + ""); + + Identifier identifier = Identifier.create("default", "my_table"); + externalWarehouseCatalog.createTable(identifier, schema, true); + + org.apache.paimon.table.Table table = externalWarehouseCatalog.getTable(identifier); + assertThat(table.options()) + .extracting(CoreOptions.PATH.key()) + .isEqualTo("file:" + externalTablePath); + + externalWarehouseCatalog.close(); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index a2ea6d0faa34..6e507264110a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -392,6 +392,11 @@ private Schema toInitialSchema( Map normalizedProperties = mergeSQLConf(properties); normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER); normalizedProperties.remove(TableCatalog.PROP_COMMENT); + if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) { + String path = normalizedProperties.remove(TableCatalog.PROP_LOCATION); + normalizedProperties.put(CoreOptions.PATH.key(), path); + } + String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER); List primaryKeys = pkAsString == null From b80667dd1bc86b3950ba0dd3cf05430110c12f45 Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Sun, 4 Aug 2024 22:20:20 +0800 Subject: [PATCH 4/8] Fix comment. --- .../src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 0dc5d445b4ee..00923f7ec924 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -239,7 +239,7 @@ protected void dropTableImpl(Identifier identifier) { protected void createTableImpl(Identifier identifier, Schema schema) { checkArgument( !schema.options().containsKey(CoreOptions.PATH.key()), - "The FileSystemCatalog does not support specifying location when creating a table."); + "The JdbcCatalog does not support specifying location when creating a table."); try { // create table file getSchemaManager(identifier).createTable(schema); From ec851e2fa1ac17d47bc246cbac1d9fd4c71b8d11 Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Mon, 5 Aug 2024 20:33:11 +0800 Subject: [PATCH 5/8] Fix location. --- .../src/main/java/org/apache/paimon/hive/HiveCatalog.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 2d39e26b23a1..b388f254ea18 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -838,8 +838,7 @@ private SchemaManager schemaManager(Identifier identifier) { } private SchemaManager schemaManager(Identifier identifier, Path path) { - return new SchemaManager( - fileIO, getTableLocation(identifier), identifier.getBranchNameOrDefault()) + return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()) .withLock(lock(identifier)); } From cd60d6ca7812d669fd5bf22e3f097e2ace7fd568 Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Mon, 5 Aug 2024 20:54:42 +0800 Subject: [PATCH 6/8] Fix clone table. --- .../java/org/apache/paimon/schema/Schema.java | 9 -------- .../clone/PickFilesForCloneOperator.java | 21 ++++++++++++++++++- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java index c6c79f4d4afd..589e3486aa7c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java @@ -337,13 +337,4 @@ public Schema build() { return new Schema(columns, partitionKeys, primaryKeys, options, comment); } } - - public static Schema fromTableSchema(TableSchema tableSchema) { - return new Schema( - tableSchema.fields(), - tableSchema.partitionKeys(), - tableSchema.primaryKeys(), - tableSchema.options(), - tableSchema.comment()); - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java index 883d7b06ab5f..0651551fe2b6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java @@ -18,15 +18,21 @@ package org.apache.paimon.flink.clone; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; + +import com.google.common.collect.Iterables; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -37,6 +43,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Pick the files to be cloned of a table based on the input record. The record type it produce is @@ -77,7 +84,7 @@ public void processElement(StreamRecord> streamRecord) th FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier); targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true); targetCatalog.createTable( - targetIdentifier, Schema.fromTableSchema(sourceTable.schema()), true); + targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true); List result = toCloneFileInfos( @@ -95,6 +102,18 @@ public void processElement(StreamRecord> streamRecord) th } } + private static Schema newSchemaFromTableSchema(TableSchema tableSchema) { + return new Schema( + ImmutableList.copyOf(tableSchema.fields()), + ImmutableList.copyOf(tableSchema.partitionKeys()), + ImmutableList.copyOf(tableSchema.primaryKeys()), + ImmutableMap.copyOf( + Iterables.filter( + tableSchema.options().entrySet(), + entry -> !Objects.equals(entry.getKey(), CoreOptions.PATH.key()))), + tableSchema.comment()); + } + private List toCloneFileInfos( List files, Path sourceTableRoot, From c2226d436a52c2078f4c9f91054a90f178d5e8df Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Mon, 5 Aug 2024 21:40:38 +0800 Subject: [PATCH 7/8] Fix style. --- .../apache/paimon/flink/clone/PickFilesForCloneOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java index 0651551fe2b6..67eecbc6f2ae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java @@ -31,8 +31,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables; -import com.google.common.collect.Iterables; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; From fcafa9c122a8485fc4ac6ed4a7fd6159958e47ff Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:11:25 +0800 Subject: [PATCH 8/8] Fix tests. --- .../apache/paimon/spark/SparkReadITCase.java | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index 744801aadf37..122e051672e8 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -190,13 +190,21 @@ public void testCreateTableAs() { spark.sql("INSERT INTO partitionedTable VALUES(1,'aaa','bbb')"); spark.sql( "CREATE TABLE partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM partitionedTable"); + + String tablePath = new Path(warehousePath, "default.db/partitionedTableAs").toString(); assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString()) .isEqualTo( String.format( - "[[%s" + "PARTITIONED BY (a)\n" + "LOCATION '%s'\n" + "]]", + "[[%s" + + "PARTITIONED BY (a)\n" + + "LOCATION '%s'\n" + + "TBLPROPERTIES (\n" + + " 'path' = '%s')\n" + + "]]", showCreateString( "partitionedTableAs", "a BIGINT", "b STRING", "c STRING"), - new Path(warehousePath, "default.db/partitionedTableAs"))); + tablePath, + tablePath)); List resultPartition = spark.sql("SELECT * FROM partitionedTableAs").collectAsList(); assertThat(resultPartition.stream().map(Row::toString)) .containsExactlyInAnyOrder("[1,aaa,bbb]"); @@ -213,17 +221,21 @@ public void testCreateTableAs() { spark.sql("INSERT INTO testTable VALUES(1,'a','b')"); spark.sql( "CREATE TABLE testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM testTable"); + + String testTableAsPath = new Path(warehousePath, "default.db/testTableAs").toString(); assertThat(spark.sql("SHOW CREATE TABLE testTableAs").collectAsList().toString()) .isEqualTo( String.format( "[[%s" + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" - + " 'file.format' = 'parquet')\n" + + " 'file.format' = 'parquet',\n" + + " 'path' = '%s')\n" + "]]", showCreateString( "testTableAs", "a BIGINT", "b VARCHAR(10)", "c CHAR(10)"), - new Path(warehousePath, "default.db/testTableAs"))); + testTableAsPath, + testTableAsPath)); List resultProp = spark.sql("SELECT * FROM testTableAs").collectAsList(); assertThat(resultProp.stream().map(Row::toString)) @@ -241,15 +253,18 @@ public void testCreateTableAs() { + "COMMENT 'table comment'"); spark.sql("INSERT INTO t_pk VALUES(1,'aaa','bbb')"); spark.sql("CREATE TABLE t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM t_pk"); + + String tPkAsPath = new Path(warehousePath, "default.db/t_pk_as").toString(); assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString()) .isEqualTo( String.format( "[[%s" + "LOCATION '%s'\n" - + "TBLPROPERTIES (\n 'primary-key' = 'a')\n]]", + + "TBLPROPERTIES (\n 'path' = '%s',\n 'primary-key' = 'a')\n]]", showCreateString( "t_pk_as", "a BIGINT NOT NULL", "b STRING", "c STRING"), - new Path(warehousePath, "default.db/t_pk_as"))); + tPkAsPath, + tPkAsPath)); List resultPk = spark.sql("SELECT * FROM t_pk_as").collectAsList(); assertThat(resultPk.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,aaa,bbb]"); @@ -268,6 +283,8 @@ public void testCreateTableAs() { spark.sql("INSERT INTO t_all VALUES(1,2,'bbb','2020-01-01','12')"); spark.sql( "CREATE TABLE t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM t_all"); + + String tAllAsPath = new Path(warehousePath, "default.db/t_all_as").toString(); assertThat(spark.sql("SHOW CREATE TABLE t_all_as").collectAsList().toString()) .isEqualTo( String.format( @@ -275,6 +292,7 @@ public void testCreateTableAs() { + "PARTITIONED BY (dt)\n" + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" + + " 'path' = '%s',\n" + " 'primary-key' = 'dt,hh')\n" + "]]", showCreateString( @@ -284,7 +302,8 @@ public void testCreateTableAs() { "behavior STRING", "dt STRING NOT NULL", "hh STRING NOT NULL"), - new Path(warehousePath, "default.db/t_all_as"))); + tAllAsPath, + tAllAsPath)); List resultAll = spark.sql("SELECT * FROM t_all_as").collectAsList(); assertThat(resultAll.stream().map(Row::toString)) .containsExactlyInAnyOrder("[1,2,bbb,2020-01-01,12]"); @@ -361,6 +380,8 @@ public void testShowCreateTable() { + " 'k1' = 'v1'\n" + ")"); + String tablePath = new Path(warehousePath, "default.db/tbl").toString(); + assertThat(spark.sql("SHOW CREATE TABLE tbl").collectAsList().toString()) .isEqualTo( String.format( @@ -370,12 +391,14 @@ public void testShowCreateTable() { + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" + " 'k1' = 'v1',\n" + + " 'path' = '%s',\n" + " 'primary-key' = 'a,b')\n]]", showCreateString( "tbl", "a INT NOT NULL COMMENT 'a comment'", "b STRING NOT NULL"), - new Path(warehousePath, "default.db/tbl"))); + tablePath, + tablePath)); } @Test