@@ -24,9 +24,10 @@ import scala.collection.JavaConverters._
2424
2525import org .scalatest .BeforeAndAfter
2626
27- import org .apache .spark .sql .{DataFrame , QueryTest }
27+ import org .apache .spark .sql .{DataFrame , QueryTest , SaveMode }
2828import org .apache .spark .sql .catalog .v2 .Identifier
2929import org .apache .spark .sql .catalog .v2 .expressions .Transform
30+ import org .apache .spark .sql .catalyst .TableIdentifier
3031import org .apache .spark .sql .catalyst .analysis .TableAlreadyExistsException
3132import org .apache .spark .sql .execution .datasources .v2 .V2SessionCatalog
3233import org .apache .spark .sql .internal .SQLConf
@@ -54,16 +55,50 @@ class DataSourceV2DataFrameSessionCatalogSuite
5455 private def verifyTable (tableName : String , expected : DataFrame ): Unit = {
5556 checkAnswer(spark.table(tableName), expected)
5657 checkAnswer(sql(s " SELECT * FROM $tableName" ), expected)
58+ checkAnswer(sql(s " SELECT * FROM default. $tableName" ), expected)
5759 checkAnswer(sql(s " TABLE $tableName" ), expected)
5860 }
5961
60- test(" saveAsTable and v2 table - table doesn't exist" ) {
62+ test(" saveAsTable: v2 table - table doesn't exist and default mode (ErrorIfExists) " ) {
6163 val t1 = " tbl"
6264 val df = Seq ((1L , " a" ), (2L , " b" ), (3L , " c" )).toDF(" id" , " data" )
6365 df.write.format(v2Format).saveAsTable(t1)
6466 verifyTable(t1, df)
6567 }
6668
69+ test(" saveAsTable: v2 table - table doesn't exist and append mode" ) {
70+ val t1 = " tbl"
71+ val df = Seq ((1L , " a" ), (2L , " b" ), (3L , " c" )).toDF(" id" , " data" )
72+ df.write.format(v2Format).mode(" append" ).saveAsTable(t1)
73+ verifyTable(t1, df)
74+ }
75+
76+ test(" saveAsTable: Append mode should not fail if the table not exists " +
77+ " but a same-name temp view exist" ) {
78+ withTable(" same_name" ) {
79+ withTempView(" same_name" ) {
80+ spark.range(10 ).createTempView(" same_name" )
81+ spark.range(20 ).write.format(v2Format).mode(SaveMode .Append ).saveAsTable(" same_name" )
82+ assert(
83+ spark.sessionState.catalog.tableExists(TableIdentifier (" same_name" , Some (" default" ))))
84+ }
85+ }
86+ }
87+
88+ test(" saveAsTable: Append mode should not fail if the table already exists " +
89+ " and a same-name temp view exist" ) {
90+ withTable(" same_name" ) {
91+ withTempView(" same_name" ) {
92+ val format = spark.sessionState.conf.defaultDataSourceName
93+ sql(s " CREATE TABLE same_name(id LONG) USING $format" )
94+ spark.range(10 ).createTempView(" same_name" )
95+ spark.range(20 ).write.format(v2Format).mode(SaveMode .Append ).saveAsTable(" same_name" )
96+ checkAnswer(spark.table(" same_name" ), spark.range(10 ).toDF())
97+ checkAnswer(spark.table(" default.same_name" ), spark.range(20 ).toDF())
98+ }
99+ }
100+ }
101+
67102 test(" saveAsTable: v2 table - table exists" ) {
68103 val t1 = " tbl"
69104 val df = Seq ((1L , " a" ), (2L , " b" ), (3L , " c" )).toDF(" id" , " data" )
@@ -94,6 +129,32 @@ class DataSourceV2DataFrameSessionCatalogSuite
94129 verifyTable(t1, df)
95130 }
96131
132+ test(" saveAsTable: Overwrite mode should not drop the temp view if the table not exists " +
133+ " but a same-name temp view exist" ) {
134+ withTable(" same_name" ) {
135+ withTempView(" same_name" ) {
136+ spark.range(10 ).createTempView(" same_name" )
137+ spark.range(20 ).write.format(v2Format).mode(SaveMode .Overwrite ).saveAsTable(" same_name" )
138+ assert(spark.sessionState.catalog.getTempView(" same_name" ).isDefined)
139+ assert(
140+ spark.sessionState.catalog.tableExists(TableIdentifier (" same_name" , Some (" default" ))))
141+ }
142+ }
143+ }
144+
145+ test(" saveAsTable with mode Overwrite should not fail if the table already exists " +
146+ " and a same-name temp view exist" ) {
147+ withTable(" same_name" ) {
148+ withTempView(" same_name" ) {
149+ sql(s " CREATE TABLE same_name(id LONG) USING $v2Format" )
150+ spark.range(10 ).createTempView(" same_name" )
151+ spark.range(20 ).write.format(v2Format).mode(SaveMode .Overwrite ).saveAsTable(" same_name" )
152+ checkAnswer(spark.table(" same_name" ), spark.range(10 ).toDF())
153+ checkAnswer(spark.table(" default.same_name" ), spark.range(20 ).toDF())
154+ }
155+ }
156+ }
157+
97158 test(" saveAsTable: v2 table - ignore mode and table doesn't exist" ) {
98159 val t1 = " tbl"
99160 val df = Seq ((1L , " a" ), (2L , " b" ), (3L , " c" )).toDF(" id" , " data" )
@@ -116,20 +177,32 @@ class InMemoryTableProvider extends TableProvider {
116177 }
117178}
118179
180+ /** A second fake format to test behavior with format changes. */
181+ class InMemoryTableProvider2 extends InMemoryTableProvider
182+
119183/** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */
120184class TestV2SessionCatalog extends V2SessionCatalog {
121185
122186 protected val tables : util.Map [Identifier , InMemoryTable ] =
123187 new ConcurrentHashMap [Identifier , InMemoryTable ]()
124188
189+ private def fullIdentifier (ident : Identifier ): Identifier = {
190+ if (ident.namespace().isEmpty) {
191+ Identifier .of(Array (" default" ), ident.name())
192+ } else {
193+ ident
194+ }
195+ }
196+
125197 override def loadTable (ident : Identifier ): Table = {
126- if (tables.containsKey(ident)) {
127- tables.get(ident)
198+ val fullIdent = fullIdentifier(ident)
199+ if (tables.containsKey(fullIdent)) {
200+ tables.get(fullIdent)
128201 } else {
129202 // Table was created through the built-in catalog
130- val t = super .loadTable(ident )
203+ val t = super .loadTable(fullIdent )
131204 val table = new InMemoryTable (t.name(), t.schema(), t.partitioning(), t.properties())
132- tables.put(ident , table)
205+ tables.put(fullIdent , table)
133206 table
134207 }
135208 }
@@ -139,8 +212,10 @@ class TestV2SessionCatalog extends V2SessionCatalog {
139212 schema : StructType ,
140213 partitions : Array [Transform ],
141214 properties : util.Map [String , String ]): Table = {
142- val t = new InMemoryTable (ident.name(), schema, partitions, properties)
143- tables.put(ident, t)
215+ val created = super .createTable(ident, schema, partitions, properties)
216+ val t = new InMemoryTable (created.name(), schema, partitions, properties)
217+ val fullIdent = fullIdentifier(ident)
218+ tables.put(fullIdent, t)
144219 t
145220 }
146221
0 commit comments