Skip to content

Commit 604d679

Browse files
edrevocloud-fan
authored andcommitted
[SPARK-30226][SQL] Remove withXXX functions in WriteBuilder
### What changes were proposed in this pull request? Adding a `LogicalWriteInfo` interface as suggested by cloud-fan in #25990 (comment) ### Why are the changes needed? It provides compile-time guarantees where we previously had none, which will make it harder to introduce bugs in the future. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Compiles and passes tests Closes #26678 from edrevo/add-logical-write-info. Lead-authored-by: Ximo Guanter <joaquin.guantergonzalbez@telefonica.com> Co-authored-by: Ximo Guanter Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 3eade74 commit 604d679

File tree

34 files changed

+207
-179
lines changed

34 files changed

+207
-179
lines changed

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus
2222

2323
import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.avro.AvroUtils
25-
import org.apache.spark.sql.connector.write.WriteBuilder
25+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
2626
import org.apache.spark.sql.execution.datasources.FileFormat
2727
import org.apache.spark.sql.execution.datasources.v2.FileTable
2828
import org.apache.spark.sql.types.{DataType, StructType}
@@ -42,8 +42,8 @@ case class AvroTable(
4242
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
4343
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
4444

45-
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
46-
new AvroWriteBuilder(options, paths, formatName, supportsDataType)
45+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
46+
new AvroWriteBuilder(paths, formatName, supportsDataType, info)
4747

4848
override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)
4949

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@ package org.apache.spark.sql.v2.avro
1919
import org.apache.hadoop.mapreduce.Job
2020

2121
import org.apache.spark.sql.avro.AvroUtils
22+
import org.apache.spark.sql.connector.write.LogicalWriteInfo
2223
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
2324
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
2425
import org.apache.spark.sql.internal.SQLConf
2526
import org.apache.spark.sql.types._
26-
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2727

2828
class AvroWriteBuilder(
29-
options: CaseInsensitiveStringMap,
3029
paths: Seq[String],
3130
formatName: String,
32-
supportsDataType: DataType => Boolean)
33-
extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
31+
supportsDataType: DataType => Boolean,
32+
info: LogicalWriteInfo)
33+
extends FileWriteBuilder(paths, formatName, supportsDataType, info) {
3434
override def prepareWrite(
3535
sqlConf: SQLConf,
3636
job: Job,

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3333
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability, TableProvider}
3434
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
3535
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
36-
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
36+
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
3737
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
3838
import org.apache.spark.sql.execution.streaming.{Sink, Source}
3939
import org.apache.spark.sql.sources._
@@ -392,18 +392,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
392392
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
393393
() => new KafkaScan(options)
394394

395-
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
395+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
396396
new WriteBuilder {
397-
private var inputSchema: StructType = _
397+
private val options = info.options
398+
private val inputSchema: StructType = info.schema()
398399
private val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
399400
private val producerParams =
400401
kafkaParamsForProducer(CaseInsensitiveMap(options.asScala.toMap))
401402

402-
override def withInputDataSchema(schema: StructType): WriteBuilder = {
403-
this.inputSchema = schema
404-
this
405-
}
406-
407403
override def buildForBatch(): BatchWrite = {
408404
assert(inputSchema != null)
409405
new KafkaBatchWrite(topic, producerParams, inputSchema)

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagedTable.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
import org.apache.spark.annotation.Experimental;
2323
import org.apache.spark.sql.connector.expressions.Transform;
24+
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
2425
import org.apache.spark.sql.types.StructType;
25-
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2626

2727
/**
2828
* Represents a table which is staged for being committed to the metastore.
@@ -32,10 +32,10 @@
3232
* {@link StagingTableCatalog#stageCreate(Identifier, StructType, Transform[], Map)} or
3333
* {@link StagingTableCatalog#stageReplace(Identifier, StructType, Transform[], Map)} to prepare the
3434
* table for being written to. This table should usually implement {@link SupportsWrite}. A new
35-
* writer will be constructed via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)},
36-
* and the write will be committed. The job concludes with a call to {@link #commitStagedChanges()},
37-
* at which point implementations are expected to commit the table's metadata into the metastore
38-
* along with the data that was written by the writes from the write builder this table created.
35+
* writer will be constructed via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}, and the
36+
* write will be committed. The job concludes with a call to {@link #commitStagedChanges()}, at
37+
* which point implementations are expected to commit the table's metadata into the metastore along
38+
* with the data that was written by the writes from the write builder this table created.
3939
*/
4040
@Experimental
4141
public interface StagedTable extends Table {

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121

2222
import org.apache.spark.annotation.Experimental;
2323
import org.apache.spark.sql.connector.expressions.Transform;
24+
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
2425
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
2526
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
2627
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
2728
import org.apache.spark.sql.connector.write.BatchWrite;
2829
import org.apache.spark.sql.connector.write.WriterCommitMessage;
2930
import org.apache.spark.sql.types.StructType;
30-
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
3131

3232
/**
3333
* An optional mix-in for implementations of {@link TableCatalog} that support staging creation of
@@ -39,9 +39,9 @@
3939
* TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
4040
* drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
4141
* {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform
42-
* the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
43-
* write operation fails, the catalog will have already dropped the table, and the planner cannot
44-
* roll back the dropping of the table.
42+
* the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}.
43+
* However, if the write operation fails, the catalog will have already dropped the table, and the
44+
* planner cannot roll back the dropping of the table.
4545
* <p>
4646
* If the catalog implements this plugin, the catalog can implement the methods to "stage" the
4747
* creation and the replacement of a table. After the table's

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsWrite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919

2020
import org.apache.spark.annotation.Experimental;
2121
import org.apache.spark.sql.connector.write.BatchWrite;
22+
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
2223
import org.apache.spark.sql.connector.write.WriteBuilder;
23-
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2424

2525
/**
2626
* A mix-in interface of {@link Table}, to indicate that it's writable. This adds
27-
* {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
28-
* for batch or streaming.
27+
* {@link #newWriteBuilder(LogicalWriteInfo)} that is used to create a
28+
* write for batch or streaming.
2929
*/
3030
@Experimental
3131
public interface SupportsWrite extends Table {
@@ -34,5 +34,5 @@ public interface SupportsWrite extends Table {
3434
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
3535
* this method to configure each data source write.
3636
*/
37-
WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options);
37+
WriteBuilder newWriteBuilder(LogicalWriteInfo info);
3838
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.types.StructType;
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
23+
24+
/**
25+
* This interface contains logical write information that data sources can use when generating a
26+
* {@link WriteBuilder}.
27+
*/
28+
@Evolving
29+
public interface LogicalWriteInfo {
30+
/**
31+
* the options that the user specified when writing the dataset
32+
*/
33+
CaseInsensitiveStringMap options();
34+
35+
/**
36+
* `queryId` is a unique string of the query. It's possible that there are many queries
37+
* running at the same time, or a query is restarted and resumed. {@link BatchWrite} can use
38+
* this id to identify the query.
39+
*/
40+
String queryId();
41+
42+
/**
43+
* the schema of the input data from Spark to data source.
44+
*/
45+
StructType schema();
46+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.spark.sql.connector.catalog.Table;
2222
import org.apache.spark.sql.connector.catalog.TableCapability;
2323
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
24-
import org.apache.spark.sql.types.StructType;
2524

2625
/**
2726
* An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to
@@ -33,28 +32,6 @@
3332
@Evolving
3433
public interface WriteBuilder {
3534

36-
/**
37-
* Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's
38-
* possible that there are many queries running at the same time, or a query is restarted and
39-
* resumed. {@link BatchWrite} can use this id to identify the query.
40-
*
41-
* @return a new builder with the `queryId`. By default it returns `this`, which means the given
42-
* `queryId` is ignored. Please override this method to take the `queryId`.
43-
*/
44-
default WriteBuilder withQueryId(String queryId) {
45-
return this;
46-
}
47-
48-
/**
49-
* Passes the schema of the input data from Spark to data source.
50-
*
51-
* @return a new builder with the `schema`. By default it returns `this`, which means the given
52-
* `schema` is ignored. Please override this method to take the `schema`.
53-
*/
54-
default WriteBuilder withInputDataSchema(StructType schema) {
55-
return this;
56-
}
57-
5835
/**
5936
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
6037
* exception, data sources must overwrite this method to provide an implementation, if the
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write
19+
20+
import org.apache.spark.sql.types.StructType
21+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
22+
23+
private[sql] case class LogicalWriteInfoImpl(
24+
queryId: String,
25+
schema: StructType,
26+
options: CaseInsensitiveStringMap) extends LogicalWriteInfo

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ class InMemoryTable(
9595
override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
9696
}
9797

98-
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
99-
InMemoryTable.maybeSimulateFailedTableWrite(options)
98+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
99+
InMemoryTable.maybeSimulateFailedTableWrite(info.options)
100100

101101
new WriteBuilder with SupportsTruncate with SupportsOverwrite with SupportsDynamicOverwrite {
102102
private var writer: BatchWrite = Append

0 commit comments

Comments
 (0)