From a1f8d99459c5efe24910c45122c467ee55af2862 Mon Sep 17 00:00:00 2001 From: PlanetWalker <52364847+hellojinsilei@users.noreply.github.com> Date: Wed, 15 Mar 2023 13:43:04 +0800 Subject: [PATCH] [ARCTIC-1210][Spark][Bugfix]: NPE exception when execute create table like command without using arctic (#1211) * add sort * optimize create table like * optimize create table like * optimize create table like * optimize create table like * optimize create table like * add catalog test * add catalog test * fix --------- Co-authored-by: jinsilei --- .../analysis/RewriteArcticCommand.scala | 54 ++++++++++++------- .../delegate/TestArcticSessionCatalog.java | 47 ++++++++++++++++ .../arctic/spark/hive/TestCreateTableDDL.java | 44 +++++++++++++++ 3 files changed, 126 insertions(+), 19 deletions(-) diff --git a/spark/v3.1/spark/src/main/scala/com/netease/arctic/spark/sql/catalyst/analysis/RewriteArcticCommand.scala b/spark/v3.1/spark/src/main/scala/com/netease/arctic/spark/sql/catalyst/analysis/RewriteArcticCommand.scala index c0048f1008..95018048fb 100644 --- a/spark/v3.1/spark/src/main/scala/com/netease/arctic/spark/sql/catalyst/analysis/RewriteArcticCommand.scala +++ b/spark/v3.1/spark/src/main/scala/com/netease/arctic/spark/sql/catalyst/analysis/RewriteArcticCommand.scala @@ -18,11 +18,13 @@ package com.netease.arctic.spark.sql.catalyst.analysis +import com.netease.arctic.spark.sql.ArcticExtensionUtils.buildCatalogAndIdentifier import com.netease.arctic.spark.sql.catalyst.plans.{AlterArcticTableDropPartition, TruncateArcticTable} import com.netease.arctic.spark.table.ArcticSparkTable import com.netease.arctic.spark.writer.WriteMode -import com.netease.arctic.table.KeyedTable +import com.netease.arctic.spark.{ArcticSparkCatalog, ArcticSparkSessionCatalog} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule @@ -33,6 +35,23 @@ import org.apache.spark.sql.execution.command.CreateTableLikeCommand * @param sparkSession */ case class RewriteArcticCommand(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + def isCreateArcticTableLikeCommand(targetTable: TableIdentifier, provider: Option[String]): Boolean = { + val (targetCatalog, _) = buildCatalogAndIdentifier(sparkSession, targetTable) + targetCatalog match { + case _: ArcticSparkCatalog => + if (provider.isEmpty || provider.get.equalsIgnoreCase("arctic")) { + true + } else { + throw new UnsupportedOperationException(s"Provider must be arctic or null when using ${classOf[ArcticSparkCatalog].getName}.") + } + case _: ArcticSparkSessionCatalog[_] => + provider.isDefined && provider.get.equalsIgnoreCase("arctic") + case _ => + false + } + } + override def apply(plan: LogicalPlan): LogicalPlan = { import com.netease.arctic.spark.sql.ArcticExtensionUtils._ plan match { @@ -53,24 +72,21 @@ case class RewriteArcticCommand(sparkSession: SparkSession) extends Rule[Logical optionsMap += (WriteMode.WRITE_MODE_KEY -> WriteMode.OVERWRITE_DYNAMIC.mode) } c.copy(properties = propertiesMap, writeOptions = optionsMap) - case CreateTableLikeCommand(targetTable, sourceTable, storage, provider, properties, ifNotExists) - if provider.get != null && provider.get.equals("arctic") => - val (sourceCatalog, sourceIdentifier) = buildCatalogAndIdentifier(sparkSession, sourceTable) - val (targetCatalog, targetIdentifier) = buildCatalogAndIdentifier(sparkSession, targetTable) - val table = sourceCatalog.loadTable(sourceIdentifier) - var targetProperties = properties - targetProperties += ("provider" -> "arctic") - table match { - case keyedTable: ArcticSparkTable => - keyedTable.table() match { - case table: KeyedTable => - targetProperties += ("primary.keys" -> String.join(",", table.primaryKeySpec().fieldNames())) - case _ => - } - case _ => - } - CreateV2Table(targetCatalog, targetIdentifier, - table.schema(), table.partitioning(), targetProperties, ifNotExists) + case c@CreateTableLikeCommand(targetTable, sourceTable, storage, provider, properties, ifNotExists) + if isCreateArcticTableLikeCommand(targetTable, provider) => { + val (sourceCatalog, sourceIdentifier) = buildCatalogAndIdentifier(sparkSession, sourceTable) + val (targetCatalog, targetIdentifier) = buildCatalogAndIdentifier(sparkSession, targetTable) + val table = sourceCatalog.loadTable(sourceIdentifier) + var targetProperties = properties + table match { + case arcticTable: ArcticSparkTable if arcticTable.table().isKeyedTable => + targetProperties += ("primary.keys" -> String.join(",", arcticTable.table().asKeyedTable().primaryKeySpec().fieldNames())) + case _ => + } + targetProperties += ("provider" -> "arctic") + CreateV2Table(targetCatalog, targetIdentifier, + table.schema(), table.partitioning(), targetProperties, ifNotExists) + } case _ => plan } } diff --git a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/delegate/TestArcticSessionCatalog.java b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/delegate/TestArcticSessionCatalog.java index 52a46fc049..8c35ebf0ba 100644 --- a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/delegate/TestArcticSessionCatalog.java +++ b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/delegate/TestArcticSessionCatalog.java @@ -25,6 +25,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; @@ -66,6 +67,8 @@ public static void startAll() throws IOException, ClassNotFoundException { configs.put("spark.sql.catalog.spark_catalog", ArcticSparkSessionCatalog.class.getName()); configs.put("spark.sql.catalog.spark_catalog.url", amsUrl + "/" + catalogNameHive); + configs.put("spark.sql.catalog.catalog", SparkCatalog.class.getName()); + configs.put("spark.sql.catalog.catalog.type", "hive"); configs.put("spark.sql.arctic.delegate.enabled", "true"); setUpSparkSession(configs); @@ -218,6 +221,50 @@ public void testCreateTableLikeUsingSparkCatalog() { sql("drop table {0}.{1}", database, table3); } + @Test + public void testCreateTableLikeWithNoProvider() throws TException { + sql("set spark.sql.arctic.delegate.enabled=true"); + sql("use spark_catalog"); + sql("create table {0}.{1} ( \n" + + " id int , \n" + + " name string , \n " + + " ts timestamp, \n" + + " primary key (id) \n" + + ") using arctic \n" + + " partitioned by ( ts ) \n" + + " tblproperties ( \n" + + " ''props.test1'' = ''val1'', \n" + + " ''props.test2'' = ''val2'' ) ", database, table3); + + sql("create table {0}.{1} like {2}.{3}", database, table2, database, table3); + Table hiveTableA = hms.getClient().getTable(database, table2); + Assert.assertNotNull(hiveTableA); + sql("drop table {0}.{1}", database, table2); + + sql("drop table {0}.{1}", database, table3); + } + + @Test + public void testCreateTableLikeWithoutArcticCatalogWithNoProvider() throws TException { + sql("use catalog"); + sql("create table {0}.{1} ( \n" + + " id int , \n" + + " name string , \n " + + " ts timestamp \n" + + ") stored as parquet \n" + + " partitioned by ( ts ) \n" + + " tblproperties ( \n" + + " ''props.test1'' = ''val1'', \n" + + " ''props.test2'' = ''val2'' ) ", database, table3); + + sql("create table {0}.{1} like {2}.{3}", database, table2, database, table3); + Table hiveTableA = hms.getClient().getTable(database, table2); + Assert.assertNotNull(hiveTableA); + sql("use spark_catalog"); + sql("drop table {0}.{1}", database, table3); + sql("drop table {0}.{1}", database, table2); + } + @Test public void testCreateTableAsSelect() { diff --git a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/hive/TestCreateTableDDL.java b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/hive/TestCreateTableDDL.java index c1dc0c00ef..366ff255e8 100644 --- a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/hive/TestCreateTableDDL.java +++ b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/hive/TestCreateTableDDL.java @@ -542,6 +542,50 @@ public void testCreateUnKeyedTableLike() { assertTableNotExist(identifier); } + @Test + public void testCreateTableLikeWithNoProvider() throws TException { + TableIdentifier identifierA = TableIdentifier.of(catalogNameHive, database, tableA); + TableIdentifier identifierB = TableIdentifier.of(catalogNameHive, database, tableB); + + sql("create table {0}.{1} ( \n" + + " id int , \n" + + " name string , \n " + + " ts timestamp," + + " primary key (id) \n" + + ") using arctic \n" + + " partitioned by ( ts ) \n" + + " tblproperties ( \n" + + " ''props.test1'' = ''val1'', \n" + + " ''props.test2'' = ''val2'' ) ", database, tableB); + + sql("create table {0}.{1} like {2}.{3}", database, tableA, database, tableB); + Table hiveTable1 = hms.getClient().getTable(database, tableA); + Assert.assertNotNull(hiveTable1); + Types.StructType expectedSchema = Types.StructType.of( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "ts", Types.TimestampType.withoutZone())); + Assert.assertEquals("Schema should match expected", + expectedSchema, loadTable(identifierA).schema().asStruct()); + sql("desc table {0}.{1}", database, tableA); + assertDescResult(rows, Lists.newArrayList("id")); + + sql("drop table {0}.{1}", database, tableA); + + sql("create table {0}.{1} like {2}.{3} using arctic", database, tableA, database, tableB); + Table hiveTable2 = hms.getClient().getTable(database, tableA); + Assert.assertNotNull(hiveTable2); + Assert.assertEquals("Schema should match expected", + expectedSchema, loadTable(identifierA).schema().asStruct()); + sql("desc table {0}.{1}", database, tableA); + assertDescResult(rows, Lists.newArrayList("id")); + + sql("drop table {0}.{1}", database, tableA); + sql("drop table {0}.{1}", database, tableB); + assertTableNotExist(identifierB); + + } + @Test public void testCreateNewTableShouldHaveTimestampWithoutZone() { withSQLConf(ImmutableMap.of(