Skip to content

Commit

Permalink
[ARCTIC-1210][Spark][Bugfix]: NPE exception when execute create table…
Browse files Browse the repository at this point in the history
… like command without using arctic (apache#1211)

* add sort

* optimize create table like

* optimize create table like

* optimize create table like

* optimize create table like

* optimize create table like

* add catalog test

* add catalog test

* fix

---------

Co-authored-by: jinsilei <jinsilei@corp.netease.com>
  • Loading branch information
2 people authored and ShawHee committed Dec 29, 2023
1 parent 7af5bd7 commit a1f8d99
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package com.netease.arctic.spark.sql.catalyst.analysis

import com.netease.arctic.spark.sql.ArcticExtensionUtils.buildCatalogAndIdentifier
import com.netease.arctic.spark.sql.catalyst.plans.{AlterArcticTableDropPartition, TruncateArcticTable}
import com.netease.arctic.spark.table.ArcticSparkTable
import com.netease.arctic.spark.writer.WriteMode
import com.netease.arctic.table.KeyedTable
import com.netease.arctic.spark.{ArcticSparkCatalog, ArcticSparkSessionCatalog}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -33,6 +35,23 @@ import org.apache.spark.sql.execution.command.CreateTableLikeCommand
* @param sparkSession
*/
case class RewriteArcticCommand(sparkSession: SparkSession) extends Rule[LogicalPlan] {

def isCreateArcticTableLikeCommand(targetTable: TableIdentifier, provider: Option[String]): Boolean = {
val (targetCatalog, _) = buildCatalogAndIdentifier(sparkSession, targetTable)
targetCatalog match {
case _: ArcticSparkCatalog =>
if (provider.isEmpty || provider.get.equalsIgnoreCase("arctic")) {
true
} else {
throw new UnsupportedOperationException(s"Provider must be arctic or null when using ${classOf[ArcticSparkCatalog].getName}.")
}
case _: ArcticSparkSessionCatalog[_] =>
provider.isDefined && provider.get.equalsIgnoreCase("arctic")
case _ =>
false
}
}

override def apply(plan: LogicalPlan): LogicalPlan = {
import com.netease.arctic.spark.sql.ArcticExtensionUtils._
plan match {
Expand All @@ -53,24 +72,21 @@ case class RewriteArcticCommand(sparkSession: SparkSession) extends Rule[Logical
optionsMap += (WriteMode.WRITE_MODE_KEY -> WriteMode.OVERWRITE_DYNAMIC.mode)
}
c.copy(properties = propertiesMap, writeOptions = optionsMap)
case CreateTableLikeCommand(targetTable, sourceTable, storage, provider, properties, ifNotExists)
if provider.get != null && provider.get.equals("arctic") =>
val (sourceCatalog, sourceIdentifier) = buildCatalogAndIdentifier(sparkSession, sourceTable)
val (targetCatalog, targetIdentifier) = buildCatalogAndIdentifier(sparkSession, targetTable)
val table = sourceCatalog.loadTable(sourceIdentifier)
var targetProperties = properties
targetProperties += ("provider" -> "arctic")
table match {
case keyedTable: ArcticSparkTable =>
keyedTable.table() match {
case table: KeyedTable =>
targetProperties += ("primary.keys" -> String.join(",", table.primaryKeySpec().fieldNames()))
case _ =>
}
case _ =>
}
CreateV2Table(targetCatalog, targetIdentifier,
table.schema(), table.partitioning(), targetProperties, ifNotExists)
case c@CreateTableLikeCommand(targetTable, sourceTable, storage, provider, properties, ifNotExists)
if isCreateArcticTableLikeCommand(targetTable, provider) => {
val (sourceCatalog, sourceIdentifier) = buildCatalogAndIdentifier(sparkSession, sourceTable)
val (targetCatalog, targetIdentifier) = buildCatalogAndIdentifier(sparkSession, targetTable)
val table = sourceCatalog.loadTable(sourceIdentifier)
var targetProperties = properties
table match {
case arcticTable: ArcticSparkTable if arcticTable.table().isKeyedTable =>
targetProperties += ("primary.keys" -> String.join(",", arcticTable.table().asKeyedTable().primaryKeySpec().fieldNames()))
case _ =>
}
targetProperties += ("provider" -> "arctic")
CreateV2Table(targetCatalog, targetIdentifier,
table.schema(), table.partitioning(), targetProperties, ifNotExists)
}
case _ => plan
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -66,6 +67,8 @@ public static void startAll() throws IOException, ClassNotFoundException {

configs.put("spark.sql.catalog.spark_catalog", ArcticSparkSessionCatalog.class.getName());
configs.put("spark.sql.catalog.spark_catalog.url", amsUrl + "/" + catalogNameHive);
configs.put("spark.sql.catalog.catalog", SparkCatalog.class.getName());
configs.put("spark.sql.catalog.catalog.type", "hive");
configs.put("spark.sql.arctic.delegate.enabled", "true");

setUpSparkSession(configs);
Expand Down Expand Up @@ -218,6 +221,50 @@ public void testCreateTableLikeUsingSparkCatalog() {
sql("drop table {0}.{1}", database, table3);
}

@Test
public void testCreateTableLikeWithNoProvider() throws TException {
sql("set spark.sql.arctic.delegate.enabled=true");
sql("use spark_catalog");
sql("create table {0}.{1} ( \n" +
" id int , \n" +
" name string , \n " +
" ts timestamp, \n" +
" primary key (id) \n" +
") using arctic \n" +
" partitioned by ( ts ) \n" +
" tblproperties ( \n" +
" ''props.test1'' = ''val1'', \n" +
" ''props.test2'' = ''val2'' ) ", database, table3);

sql("create table {0}.{1} like {2}.{3}", database, table2, database, table3);
Table hiveTableA = hms.getClient().getTable(database, table2);
Assert.assertNotNull(hiveTableA);
sql("drop table {0}.{1}", database, table2);

sql("drop table {0}.{1}", database, table3);
}

@Test
public void testCreateTableLikeWithoutArcticCatalogWithNoProvider() throws TException {
sql("use catalog");
sql("create table {0}.{1} ( \n" +
" id int , \n" +
" name string , \n " +
" ts timestamp \n" +
") stored as parquet \n" +
" partitioned by ( ts ) \n" +
" tblproperties ( \n" +
" ''props.test1'' = ''val1'', \n" +
" ''props.test2'' = ''val2'' ) ", database, table3);

sql("create table {0}.{1} like {2}.{3}", database, table2, database, table3);
Table hiveTableA = hms.getClient().getTable(database, table2);
Assert.assertNotNull(hiveTableA);
sql("use spark_catalog");
sql("drop table {0}.{1}", database, table3);
sql("drop table {0}.{1}", database, table2);
}


@Test
public void testCreateTableAsSelect() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,50 @@ public void testCreateUnKeyedTableLike() {
assertTableNotExist(identifier);
}

@Test
public void testCreateTableLikeWithNoProvider() throws TException {
TableIdentifier identifierA = TableIdentifier.of(catalogNameHive, database, tableA);
TableIdentifier identifierB = TableIdentifier.of(catalogNameHive, database, tableB);

sql("create table {0}.{1} ( \n" +
" id int , \n" +
" name string , \n " +
" ts timestamp," +
" primary key (id) \n" +
") using arctic \n" +
" partitioned by ( ts ) \n" +
" tblproperties ( \n" +
" ''props.test1'' = ''val1'', \n" +
" ''props.test2'' = ''val2'' ) ", database, tableB);

sql("create table {0}.{1} like {2}.{3}", database, tableA, database, tableB);
Table hiveTable1 = hms.getClient().getTable(database, tableA);
Assert.assertNotNull(hiveTable1);
Types.StructType expectedSchema = Types.StructType.of(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()),
Types.NestedField.optional(3, "ts", Types.TimestampType.withoutZone()));
Assert.assertEquals("Schema should match expected",
expectedSchema, loadTable(identifierA).schema().asStruct());
sql("desc table {0}.{1}", database, tableA);
assertDescResult(rows, Lists.newArrayList("id"));

sql("drop table {0}.{1}", database, tableA);

sql("create table {0}.{1} like {2}.{3} using arctic", database, tableA, database, tableB);
Table hiveTable2 = hms.getClient().getTable(database, tableA);
Assert.assertNotNull(hiveTable2);
Assert.assertEquals("Schema should match expected",
expectedSchema, loadTable(identifierA).schema().asStruct());
sql("desc table {0}.{1}", database, tableA);
assertDescResult(rows, Lists.newArrayList("id"));

sql("drop table {0}.{1}", database, tableA);
sql("drop table {0}.{1}", database, tableB);
assertTableNotExist(identifierB);

}

@Test
public void testCreateNewTableShouldHaveTimestampWithoutZone() {
withSQLConf(ImmutableMap.of(
Expand Down

0 comments on commit a1f8d99

Please sign in to comment.