diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 0e6171724402e..d60ee1cadd195 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -37,8 +37,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
* @param kafkaParams String params for per-task Kafka consumers.
- * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
- * are not Kafka consumer params.
+ * @param sourceOptions Params which are not Kafka consumer params.
* @param metadataPath Path to a directory this reader can use for writing metadata.
* @param initialOffsets The Kafka offsets to start reading data at.
* @param failOnDataLoss Flag indicating whether reading should fail in data loss
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index a6303461445fa..6972f391f2852 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.UninterruptibleThread
/**
@@ -57,7 +57,7 @@ import org.apache.spark.util.UninterruptibleThread
private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader: KafkaOffsetReader,
executorKafkaParams: ju.Map[String, Object],
- options: DataSourceOptions,
+ options: CaseInsensitiveStringMap,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
@@ -66,8 +66,7 @@ private[kafka010] class KafkaMicroBatchStream(
"kafkaConsumer.pollTimeoutMs",
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
- private val maxOffsetsPerTrigger =
- Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+ private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)
private val rangeCalculator = KafkaOffsetRangeCalculator(options)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index 6008794924052..1af8404b89c68 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition
-import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
@@ -91,8 +91,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
private[kafka010] object KafkaOffsetRangeCalculator {
- def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
- val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
+ def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = {
+ val optionalValue = Option(options.get("minPartitions")).map(_.toInt)
new KafkaOffsetRangeCalculator(optionalValue)
}
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index b39e0d40fd31c..8496cbda261be 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* The provider class for all Kafka readers and writers. It is designed such that it throws
@@ -103,8 +104,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}
- override def getTable(options: DataSourceOptions): KafkaTable = {
- new KafkaTable(strategy(options.asMap().asScala.toMap))
+ override def getTable(options: CaseInsensitiveStringMap): KafkaTable = {
+ new KafkaTable(strategy(options.asScala.toMap))
}
/**
@@ -358,11 +359,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def schema(): StructType = KafkaOffsetReader.kafkaSchema
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder {
private var inputSchema: StructType = _
@@ -375,20 +376,20 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
import scala.collection.JavaConverters._
assert(inputSchema != null)
- val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
- val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
+ val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
+ val producerParams = kafkaParamsForProducer(options.asScala.toMap)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
}
}
}
- class KafkaScan(options: DataSourceOptions) extends Scan {
+ class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {
override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
- val parameters = options.asMap().asScala.toMap
+ val parameters = options.asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
@@ -417,7 +418,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
- val parameters = options.asMap().asScala.toMap
+ val parameters = options.asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 8fd5790d753af..21634ae2abfa1 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -41,10 +41,10 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest {
@@ -1118,7 +1118,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
) ++ Option(minPartitions).map { p => "minPartitions" -> p}
- val dsOptions = new DataSourceOptions(options.asJava)
+ val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = provider.getTable(dsOptions)
val stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath)
val inputPartitions = stream.planInputPartitions(
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
index 2ccf3e291bea7..7ffdaab3e74fb 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
@@ -22,13 +22,13 @@ import scala.collection.JavaConverters._
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
def testWithMinPartitions(name: String, minPartition: Int)
(f: KafkaOffsetRangeCalculator => Unit): Unit = {
- val options = new DataSourceOptions(Map("minPartitions" -> minPartition.toString).asJava)
+ val options = new CaseInsensitiveStringMap(Map("minPartitions" -> minPartition.toString).asJava)
test(s"with minPartition = $minPartition: $name") {
f(KafkaOffsetRangeCalculator(options))
}
@@ -36,7 +36,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
test("with no minPartition: N TopicPartitions to N offset ranges") {
- val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
+ val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
assert(
calc.getRanges(
fromOffsets = Map(tp1 -> 1),
@@ -64,7 +64,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
}
test("with no minPartition: empty ranges ignored") {
- val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
+ val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
assert(
calc.getRanges(
fromOffsets = Map(tp1 -> 1, tp2 -> 1),
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
index 8c5a6c61d8658..704d90ed60adc 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
@@ -31,19 +31,20 @@
* This is used to pass options to v2 implementations to ensure consistent case insensitivity.
*
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
- * keys converted to lower case.
+ * keys converted to lower case. This map doesn't allow null key.
*/
@Experimental
public class CaseInsensitiveStringMap implements Map {
public static CaseInsensitiveStringMap empty() {
- return new CaseInsensitiveStringMap();
+ return new CaseInsensitiveStringMap(new HashMap<>(0));
}
private final Map delegate;
- private CaseInsensitiveStringMap() {
- this.delegate = new HashMap<>();
+ public CaseInsensitiveStringMap(Map originalMap) {
+ this.delegate = new HashMap<>(originalMap.size());
+ putAll(originalMap);
}
@Override
@@ -56,9 +57,13 @@ public boolean isEmpty() {
return delegate.isEmpty();
}
+ private String toLowerCase(Object key) {
+ return key.toString().toLowerCase(Locale.ROOT);
+ }
+
@Override
public boolean containsKey(Object key) {
- return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
+ return delegate.containsKey(toLowerCase(key));
}
@Override
@@ -68,17 +73,17 @@ public boolean containsValue(Object value) {
@Override
public String get(Object key) {
- return delegate.get(key.toString().toLowerCase(Locale.ROOT));
+ return delegate.get(toLowerCase(key));
}
@Override
public String put(String key, String value) {
- return delegate.put(key.toLowerCase(Locale.ROOT), value);
+ return delegate.put(toLowerCase(key), value);
}
@Override
public String remove(Object key) {
- return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
+ return delegate.remove(toLowerCase(key));
}
@Override
@@ -107,4 +112,49 @@ public Collection values() {
public Set> entrySet() {
return delegate.entrySet();
}
+
+ /**
+ * Returns the boolean value to which the specified key is mapped,
+ * or defaultValue if there is no mapping for the key. The key match is case-insensitive.
+ */
+ public boolean getBoolean(String key, boolean defaultValue) {
+ String value = get(key);
+ // We can't use `Boolean.parseBoolean` here, as it returns false for invalid strings.
+ if (value == null) {
+ return defaultValue;
+ } else if (value.equalsIgnoreCase("true")) {
+ return true;
+ } else if (value.equalsIgnoreCase("false")) {
+ return false;
+ } else {
+ throw new IllegalArgumentException(value + " is not a boolean string.");
+ }
+ }
+
+ /**
+ * Returns the integer value to which the specified key is mapped,
+ * or defaultValue if there is no mapping for the key. The key match is case-insensitive.
+ */
+ public int getInt(String key, int defaultValue) {
+ String value = get(key);
+ return value == null ? defaultValue : Integer.parseInt(value);
+ }
+
+ /**
+ * Returns the long value to which the specified key is mapped,
+ * or defaultValue if there is no mapping for the key. The key match is case-insensitive.
+ */
+ public long getLong(String key, long defaultValue) {
+ String value = get(key);
+ return value == null ? defaultValue : Long.parseLong(value);
+ }
+
+ /**
+ * Returns the double value to which the specified key is mapped,
+ * or defaultValue if there is no mapping for the key. The key match is case-insensitive.
+ */
+ public double getDouble(String key, double defaultValue) {
+ String value = get(key);
+ return value == null ? defaultValue : Double.parseDouble(value);
+ }
}
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
deleted file mode 100644
index 76392777d42a4..0000000000000
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class CaseInsensitiveStringMapSuite {
- @Test
- public void testPutAndGet() {
- CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
- options.put("kEy", "valUE");
-
- Assert.assertEquals("Should return correct value for lower-case key",
- "valUE", options.get("key"));
- Assert.assertEquals("Should return correct value for upper-case key",
- "valUE", options.get("KEY"));
- }
-
- @Test
- public void testKeySet() {
- CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
- options.put("kEy", "valUE");
-
- Set expectedKeySet = new HashSet<>();
- expectedKeySet.add("key");
-
- Assert.assertEquals("Should return lower-case key set", expectedKeySet, options.keySet());
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
similarity index 52%
rename from sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
rename to sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
index cfa69a86de1a7..623ddeb140254 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
@@ -15,31 +15,29 @@
* limitations under the License.
*/
-package org.apache.spark.sql.sources.v2
+package org.apache.spark.sql.util
import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
-/**
- * A simple test suite to verify `DataSourceOptions`.
- */
-class DataSourceOptionsSuite extends SparkFunSuite {
+class CaseInsensitiveStringMapSuite extends SparkFunSuite {
- test("key is case-insensitive") {
- val options = new DataSourceOptions(Map("foo" -> "bar").asJava)
- assert(options.get("foo").get() == "bar")
- assert(options.get("FoO").get() == "bar")
- assert(!options.get("abc").isPresent)
+ test("put and get") {
+ val options = CaseInsensitiveStringMap.empty()
+ options.put("kEy", "valUE")
+ assert(options.get("key") == "valUE")
+ assert(options.get("KEY") == "valUE")
}
- test("value is case-sensitive") {
- val options = new DataSourceOptions(Map("foo" -> "bAr").asJava)
- assert(options.get("foo").get == "bAr")
+ test("key and value set") {
+ val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
+ assert(options.keySet().asScala == Set("key"))
+ assert(options.values().asScala.toSeq == Seq("valUE"))
}
test("getInt") {
- val options = new DataSourceOptions(Map("numFOo" -> "1", "foo" -> "bar").asJava)
+ val options = new CaseInsensitiveStringMap(Map("numFOo" -> "1", "foo" -> "bar").asJava)
assert(options.getInt("numFOO", 10) == 1)
assert(options.getInt("numFOO2", 10) == 10)
@@ -49,17 +47,20 @@ class DataSourceOptionsSuite extends SparkFunSuite {
}
test("getBoolean") {
- val options = new DataSourceOptions(
+ val options = new CaseInsensitiveStringMap(
Map("isFoo" -> "true", "isFOO2" -> "false", "foo" -> "bar").asJava)
assert(options.getBoolean("isFoo", false))
assert(!options.getBoolean("isFoo2", true))
assert(options.getBoolean("isBar", true))
assert(!options.getBoolean("isBar", false))
- assert(!options.getBoolean("FOO", true))
+
+ intercept[IllegalArgumentException] {
+ options.getBoolean("FOO", true)
+ }
}
test("getLong") {
- val options = new DataSourceOptions(Map("numFoo" -> "9223372036854775807",
+ val options = new CaseInsensitiveStringMap(Map("numFoo" -> "9223372036854775807",
"foo" -> "bar").asJava)
assert(options.getLong("numFOO", 0L) == 9223372036854775807L)
assert(options.getLong("numFoo2", -1L) == -1L)
@@ -70,7 +71,7 @@ class DataSourceOptionsSuite extends SparkFunSuite {
}
test("getDouble") {
- val options = new DataSourceOptions(Map("numFoo" -> "922337.1",
+ val options = new CaseInsensitiveStringMap(Map("numFoo" -> "922337.1",
"foo" -> "bar").asJava)
assert(options.getDouble("numFOO", 0d) == 922337.1d)
assert(options.getDouble("numFoo2", -1.02d) == -1.02d)
@@ -79,29 +80,4 @@ class DataSourceOptionsSuite extends SparkFunSuite {
options.getDouble("foo", 0.1d)
}
}
-
- test("standard options") {
- val options = new DataSourceOptions(Map(
- DataSourceOptions.PATH_KEY -> "abc",
- DataSourceOptions.TABLE_KEY -> "tbl").asJava)
-
- assert(options.paths().toSeq == Seq("abc"))
- assert(options.tableName().get() == "tbl")
- assert(!options.databaseName().isPresent)
- }
-
- test("standard options with both singular path and multi-paths") {
- val options = new DataSourceOptions(Map(
- DataSourceOptions.PATH_KEY -> "abc",
- DataSourceOptions.PATHS_KEY -> """["c", "d"]""").asJava)
-
- assert(options.paths().toSeq == Seq("abc", "c", "d"))
- }
-
- test("standard options with only multi-paths") {
- val options = new DataSourceOptions(Map(
- DataSourceOptions.PATHS_KEY -> """["c", "d\"e"]""").asJava)
-
- assert(options.paths().toSeq == Seq("c", "d\"e"))
- }
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
deleted file mode 100644
index 00af0bf1b172c..0000000000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Stream;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.spark.annotation.Evolving;
-
-/**
- * An immutable string-to-string map in which keys are case-insensitive. This is used to represent
- * data source options.
- *
- * Each data source implementation can define its own options and teach its users how to set them.
- * Spark doesn't have any restrictions about what options a data source should or should not have.
- * Instead Spark defines some standard options that data sources can optionally adopt. It's possible
- * that some options are very common and many data sources use them. However different data
- * sources may define the common options(key and meaning) differently, which is quite confusing to
- * end users.
- *
- * The standard options defined by Spark:
- *
- *
- * Option key |
- * Option value |
- *
- *
- * path |
- * A path string of the data files/directories, like
- * path1 , /absolute/file2 , path3/* . The path can
- * either be relative or absolute, points to either file or directory, and can contain
- * wildcards. This option is commonly used by file-based data sources. |
- *
- *
- * paths |
- * A JSON array style paths string of the data files/directories, like
- * ["path1", "/absolute/file2"] . The format of each path is same as the
- * path option, plus it should follow JSON string literal format, e.g. quotes
- * should be escaped, pa\"th means pa"th.
- * |
- *
- *
- * table |
- * A table name string representing the table name directly without any interpretation.
- * For example, db.tbl means a table called db.tbl, not a table called tbl
- * inside database db. `t*b.l` means a table called `t*b.l`, not t*b.l. |
- *
- *
- * database |
- * A database name string representing the database name directly without any
- * interpretation, which is very similar to the table name option. |
- *
- *
- */
-@Evolving
-public class DataSourceOptions {
- private final Map keyLowerCasedMap;
-
- private String toLowerCase(String key) {
- return key.toLowerCase(Locale.ROOT);
- }
-
- public static DataSourceOptions empty() {
- return new DataSourceOptions(new HashMap<>());
- }
-
- public DataSourceOptions(Map originalMap) {
- keyLowerCasedMap = new HashMap<>(originalMap.size());
- for (Map.Entry entry : originalMap.entrySet()) {
- keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
- }
- }
-
- public Map asMap() {
- return new HashMap<>(keyLowerCasedMap);
- }
-
- /**
- * Returns the option value to which the specified key is mapped, case-insensitively.
- */
- public Optional get(String key) {
- return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
- }
-
- /**
- * Returns the boolean value to which the specified key is mapped,
- * or defaultValue if there is no mapping for the key. The key match is case-insensitive
- */
- public boolean getBoolean(String key, boolean defaultValue) {
- String lcaseKey = toLowerCase(key);
- return keyLowerCasedMap.containsKey(lcaseKey) ?
- Boolean.parseBoolean(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
- }
-
- /**
- * Returns the integer value to which the specified key is mapped,
- * or defaultValue if there is no mapping for the key. The key match is case-insensitive
- */
- public int getInt(String key, int defaultValue) {
- String lcaseKey = toLowerCase(key);
- return keyLowerCasedMap.containsKey(lcaseKey) ?
- Integer.parseInt(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
- }
-
- /**
- * Returns the long value to which the specified key is mapped,
- * or defaultValue if there is no mapping for the key. The key match is case-insensitive
- */
- public long getLong(String key, long defaultValue) {
- String lcaseKey = toLowerCase(key);
- return keyLowerCasedMap.containsKey(lcaseKey) ?
- Long.parseLong(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
- }
-
- /**
- * Returns the double value to which the specified key is mapped,
- * or defaultValue if there is no mapping for the key. The key match is case-insensitive
- */
- public double getDouble(String key, double defaultValue) {
- String lcaseKey = toLowerCase(key);
- return keyLowerCasedMap.containsKey(lcaseKey) ?
- Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
- }
-
- /**
- * The option key for singular path.
- */
- public static final String PATH_KEY = "path";
-
- /**
- * The option key for multiple paths.
- */
- public static final String PATHS_KEY = "paths";
-
- /**
- * The option key for table name.
- */
- public static final String TABLE_KEY = "table";
-
- /**
- * The option key for database name.
- */
- public static final String DATABASE_KEY = "database";
-
- /**
- * The option key for whether to check existence of files for a table.
- */
- public static final String CHECK_FILES_EXIST_KEY = "check_files_exist";
-
- /**
- * Returns all the paths specified by both the singular path option and the multiple
- * paths option.
- */
- public String[] paths() {
- String[] singularPath =
- get(PATH_KEY).map(s -> new String[]{s}).orElseGet(() -> new String[0]);
- Optional pathsStr = get(PATHS_KEY);
- if (pathsStr.isPresent()) {
- ObjectMapper objectMapper = new ObjectMapper();
- try {
- String[] paths = objectMapper.readValue(pathsStr.get(), String[].class);
- return Stream.of(singularPath, paths).flatMap(Stream::of).toArray(String[]::new);
- } catch (IOException e) {
- return singularPath;
- }
- } else {
- return singularPath;
- }
- }
-
- /**
- * Returns the value of the table name option.
- */
- public Optional tableName() {
- return get(TABLE_KEY);
- }
-
- /**
- * Returns the value of the database name option.
- */
- public Optional databaseName() {
- return get(DATABASE_KEY);
- }
-
- public Boolean checkFilesExist() {
- Optional result = get(CHECK_FILES_EXIST_KEY);
- return result.isPresent() && result.get().equals("true");
- }
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
index 6c5a95d2a75b7..ea7c5d2b108f0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
@@ -20,13 +20,14 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An empty mix-in interface for {@link Table}, to indicate this table supports batch scan.
*
* If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
- * builds {@link Scan} with {@link Scan#toBatch()} implemented.
+ * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
+ * that builds {@link Scan} with {@link Scan#toBatch()} implemented.
*
*/
@Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
index b2cd97a2f5332..09e23f84fd6bf 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
@@ -19,13 +19,14 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
*
* If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
- * with {@link WriteBuilder#buildForBatch()} implemented.
+ * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a
+ * {@link WriteBuilder} with {@link WriteBuilder#buildForBatch()} implemented.
*
*/
@Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
index b7fa3f24a238c..5cc9848d9da89 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
@@ -20,14 +20,15 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
* continuous mode.
*
* If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
- * builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
+ * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
+ * that builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
*
*/
@Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
index 9408e323f9da1..c98f3f1aa5cba 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
@@ -20,14 +20,15 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
* micro-batch mode.
*
* If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
- * builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented.
+ * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
+ * that builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented.
*
*/
@Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
index 5031c71c0fd4d..14990effeda37 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
@@ -19,11 +19,12 @@
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An internal base interface of mix-in interfaces for readable {@link Table}. This adds
- * {@link #newScanBuilder(DataSourceOptions)} that is used to create a scan for batch, micro-batch,
- * or continuous processing.
+ * {@link #newScanBuilder(CaseInsensitiveStringMap)} that is used to create a scan for batch,
+ * micro-batch, or continuous processing.
*/
interface SupportsRead extends Table {
@@ -34,5 +35,5 @@ interface SupportsRead extends Table {
* @param options The options for reading, which is an immutable case-insensitive
* string-to-string map.
*/
- ScanBuilder newScanBuilder(DataSourceOptions options);
+ ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
index 1050d35250c1f..ac11e483c18c4 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
@@ -20,13 +20,14 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An empty mix-in interface for {@link Table}, to indicate this table supports streaming write.
*
* If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
- * with {@link WriteBuilder#buildForStreaming()} implemented.
+ * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a
+ * {@link WriteBuilder} with {@link WriteBuilder#buildForStreaming()} implemented.
*
*/
@Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
index ecdfe20730254..f0d8e44f15287 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
@@ -19,10 +19,11 @@
import org.apache.spark.sql.sources.v2.writer.BatchWrite;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An internal base interface of mix-in interfaces for writable {@link Table}. This adds
- * {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write
+ * {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
* for batch or streaming.
*/
interface SupportsWrite extends Table {
@@ -31,5 +32,5 @@ interface SupportsWrite extends Table {
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
* this method to configure each data source write.
*/
- WriteBuilder newWriteBuilder(DataSourceOptions options);
+ WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
index a9b83b6de9950..04ad8fd90be9f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
@@ -20,6 +20,7 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* The base interface for v2 data sources which don't have a real catalog. Implementations must
@@ -37,7 +38,7 @@ public interface TableProvider {
* @param options the user-specified options that can identify a table, e.g. file path, Kafka
* topic name, etc. It's an immutable case-insensitive string-to-string map.
*/
- Table getTable(DataSourceOptions options);
+ Table getTable(CaseInsensitiveStringMap options);
/**
* Return a {@link Table} instance to do read/write with user-specified schema and options.
@@ -50,7 +51,7 @@ public interface TableProvider {
* @param schema the user-specified schema.
* @throws UnsupportedOperationException
*/
- default Table getTable(DataSourceOptions options, StructType schema) {
+ default Table getTable(CaseInsensitiveStringMap options, StructType schema) {
String name;
if (this instanceof DataSourceRegister) {
name = ((DataSourceRegister) this).shortName();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index a8562581ee85d..2cc9370346515 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, FileTable}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
/**
@@ -176,7 +177,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
def load(path: String): DataFrame = {
// force invocation of `load(...varargs...)`
- option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*)
+ option("path", path).load(Seq.empty: _*)
}
/**
@@ -206,20 +207,23 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
- val pathsOption = {
+ val pathsOption = if (paths.isEmpty) {
+ None
+ } else {
val objectMapper = new ObjectMapper()
- DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
+ Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
}
- val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "true"
- val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + checkFilesExistsOption
- val dsOptions = new DataSourceOptions(finalOptions.asJava)
+ // TODO SPARK-27113: remove this option.
+ val checkFilesExistsOpt = "check_files_exist" -> "true"
+ val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption + checkFilesExistsOpt
+ val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val table = userSpecifiedSchema match {
case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
}
table match {
case _: SupportsBatchRead =>
- Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions))
+ Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, dsOptions))
case _ => loadV1Source(paths: _*)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 8d4d60ebdb547..e58225e0f58ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
@@ -260,12 +261,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, session.sessionState.conf)
- val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "false"
+ // TODO SPARK-27113: remove this option.
+ val checkFilesExistsOption = "check_files_exist" -> "false"
val options = sessionOptions ++ extraOptions + checkFilesExistsOption
- val dsOptions = new DataSourceOptions(options.asJava)
+ val dsOptions = new CaseInsensitiveStringMap(options.asJava)
provider.getTable(dsOptions) match {
case table: SupportsBatchWrite =>
- lazy val relation = DataSourceV2Relation.create(table, options)
+ lazy val relation = DataSourceV2Relation.create(table, dsOptions)
mode match {
case SaveMode.Append =>
runCommand(df.sparkSession, "save") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
index e22d6a6d399a5..7c72495548e3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -33,10 +35,15 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
*/
class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
+ case i @ InsertIntoTable(d @ DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
val v1FileFormat = new OrcFileFormat
- val relation = HadoopFsRelation(table.fileIndex, table.fileIndex.partitionSchema,
- table.schema(), None, v1FileFormat, d.options)(sparkSession)
+ val relation = HadoopFsRelation(
+ table.fileIndex,
+ table.fileIndex.partitionSchema,
+ table.schema(),
+ None,
+ v1FileFormat,
+ d.options.asScala.toMap)(sparkSession)
i.copy(table = LogicalRelation(relation))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 22a74e3ccaeee..aa2a5e9a06fbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* This is no-op datasource. It does not do anything besides consuming its input.
@@ -31,11 +32,11 @@ import org.apache.spark.sql.types.StructType
*/
class NoopDataSource extends TableProvider with DataSourceRegister {
override def shortName(): String = "noop"
- override def getTable(options: DataSourceOptions): Table = NoopTable
+ override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}
private[noop] object NoopTable extends Table with SupportsBatchWrite with SupportsStreamingWrite {
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = NoopWriteBuilder
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index c8542bfe5e59b..2081af35ce2d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql.execution.datasources.v2
-import scala.collection.JavaConverters._
-
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
object DataSourceV2Implicits {
implicit class TableHelper(table: Table) {
@@ -42,8 +40,4 @@ object DataSourceV2Implicits {
}
}
}
-
- implicit class OptionsHelper(options: Map[String, String]) {
- def toDataSourceOptions: DataSourceOptions = new DataSourceOptions(options.asJava)
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 891694be46291..17407827d0564 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Statistics => V2Statistics, _}
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A logical plan representing a data source v2 table.
@@ -36,7 +37,7 @@ import org.apache.spark.sql.sources.v2.writer._
case class DataSourceV2Relation(
table: Table,
output: Seq[AttributeReference],
- options: Map[String, String])
+ options: CaseInsensitiveStringMap)
extends LeafNode with MultiInstanceRelation with NamedRelation {
import DataSourceV2Implicits._
@@ -48,7 +49,7 @@ case class DataSourceV2Relation(
}
def newScanBuilder(): ScanBuilder = {
- table.asBatchReadable.newScanBuilder(options.toDataSourceOptions)
+ table.asBatchReadable.newScanBuilder(options)
}
override def computeStats(): Statistics = {
@@ -96,7 +97,7 @@ case class StreamingDataSourceV2Relation(
}
object DataSourceV2Relation {
- def create(table: Table, options: Map[String, String]): DataSourceV2Relation = {
+ def create(table: Table, options: CaseInsensitiveStringMap): DataSourceV2Relation = {
val output = table.schema().toAttributes
DataSourceV2Relation(table, output, options)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index bf606267aa34d..424fbed6fc1e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -148,8 +148,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
case AppendData(r: DataSourceV2Relation, query, _) =>
- AppendDataExec(
- r.table.asBatchWritable, r.options.toDataSourceOptions, planLater(query)) :: Nil
+ AppendDataExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil
case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
@@ -159,11 +158,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}.toArray
OverwriteByExpressionExec(
- r.table.asBatchWritable, filters, r.options.toDataSourceOptions, planLater(query)) :: Nil
+ r.table.asBatchWritable, filters, r.options, planLater(query)) :: Nil
case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
- OverwritePartitionsDynamicExec(r.table.asBatchWritable,
- r.options.toDataSourceOptions, planLater(query)) :: Nil
+ OverwritePartitionsDynamicExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil
case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 06c57066aa240..e9c7a1bb749db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -16,10 +16,13 @@
*/
package org.apache.spark.sql.execution.datasources.v2
+import com.fasterxml.jackson.databind.ObjectMapper
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.TableProvider
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A base interface for data source v2 implementations of the built-in file-based data sources.
@@ -35,4 +38,13 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
def fallBackFileFormat: Class[_ <: FileFormat]
lazy val sparkSession = SparkSession.active
+
+ protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
+ val objectMapper = new ObjectMapper()
+ Option(map.get("paths")).map { pathStr =>
+ objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
+ }.getOrElse {
+ Option(map.get("path")).toSeq
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 21d3e5e29cfb5..08873a3b5a643 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -22,23 +22,27 @@ import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
abstract class FileTable(
sparkSession: SparkSession,
- options: DataSourceOptions,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
userSpecifiedSchema: Option[StructType])
extends Table with SupportsBatchRead with SupportsBatchWrite {
+
lazy val fileIndex: PartitioningAwareFileIndex = {
- val filePaths = options.paths()
- val hadoopConf =
- sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
- val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
- checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
+ val scalaMap = options.asScala.toMap
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(scalaMap)
+ // This is an internal config so must be present.
+ val checkFilesExist = options.get("check_files_exist").toBoolean
+ val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
+ checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
- new InMemoryFileIndex(sparkSession, rootPathsSpecified,
- options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
+ new InMemoryFileIndex(
+ sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache)
}
lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
index 75c922424e8ef..e16ee4c460f39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
@@ -33,12 +33,12 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder}
import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
-abstract class FileWriteBuilder(options: DataSourceOptions)
+abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String])
extends WriteBuilder with SupportsSaveMode {
private var schema: StructType = _
private var queryId: String = _
@@ -61,18 +61,17 @@ abstract class FileWriteBuilder(options: DataSourceOptions)
override def buildForBatch(): BatchWrite = {
validateInputs()
- val pathName = options.paths().head
- val path = new Path(pathName)
+ val path = new Path(paths.head)
val sparkSession = SparkSession.active
- val optionsAsScala = options.asMap().asScala.toMap
+ val optionsAsScala = options.asScala.toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
val job = getJobInstance(hadoopConf, path)
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
- outputPath = pathName)
+ outputPath = paths.head)
lazy val description =
- createWriteJobDescription(sparkSession, hadoopConf, job, pathName, optionsAsScala)
+ createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, optionsAsScala)
val fs = path.getFileSystem(hadoopConf)
mode match {
@@ -127,7 +126,7 @@ abstract class FileWriteBuilder(options: DataSourceOptions)
assert(schema != null, "Missing input data schema")
assert(queryId != null, "Missing query ID")
assert(mode != null, "Missing save mode")
- assert(options.paths().length == 1)
+ assert(paths.length == 1)
DataSource.validateSchema(schema)
schema.foreach { field =>
if (!supportsDataType(field.dataType)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index d7cb2457433b0..51606abdb563a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -31,8 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchWrite}
+import org.apache.spark.sql.sources.v2.SupportsBatchWrite
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}
/**
@@ -53,7 +54,7 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
*/
case class AppendDataExec(
table: SupportsBatchWrite,
- writeOptions: DataSourceOptions,
+ writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
override protected def doExecute(): RDD[InternalRow] = {
@@ -81,7 +82,7 @@ case class AppendDataExec(
case class OverwriteByExpressionExec(
table: SupportsBatchWrite,
deleteWhere: Array[Filter],
- writeOptions: DataSourceOptions,
+ writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
private def isTruncate(filters: Array[Filter]): Boolean = {
@@ -118,7 +119,7 @@ case class OverwriteByExpressionExec(
*/
case class OverwritePartitionsDynamicExec(
table: SupportsBatchWrite,
- writeOptions: DataSourceOptions,
+ writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
override protected def doExecute(): RDD[InternalRow] = {
@@ -139,12 +140,9 @@ case class OverwritePartitionsDynamicExec(
case class WriteToDataSourceV2Exec(
batchWrite: BatchWrite,
- query: SparkPlan
- ) extends V2TableWriteExec {
+ query: SparkPlan) extends V2TableWriteExec {
- import DataSourceV2Implicits._
-
- def writeOptions: DataSourceOptions = Map.empty[String, String].toDataSourceOptions
+ def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()
override protected def doExecute(): RDD[InternalRow] = {
doWrite(batchWrite)
@@ -157,7 +155,7 @@ case class WriteToDataSourceV2Exec(
trait BatchWriteHelper {
def table: SupportsBatchWrite
def query: SparkPlan
- def writeOptions: DataSourceOptions
+ def writeOptions: CaseInsensitiveStringMap
def newWriteBuilder(): WriteBuilder = {
table.newWriteBuilder(writeOptions)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
index f279af49ba9cf..900c94e937ffc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.datasources.v2.orc
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, Table}
+import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
class OrcDataSourceV2 extends FileDataSourceV2 {
@@ -28,18 +29,20 @@ class OrcDataSourceV2 extends FileDataSourceV2 {
override def shortName(): String = "orc"
- private def getTableName(options: DataSourceOptions): String = {
- shortName() + ":" + options.paths().mkString(";")
+ private def getTableName(paths: Seq[String]): String = {
+ shortName() + ":" + paths.mkString(";")
}
- override def getTable(options: DataSourceOptions): Table = {
- val tableName = getTableName(options)
- OrcTable(tableName, sparkSession, options, None)
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
+ val paths = getPaths(options)
+ val tableName = getTableName(paths)
+ OrcTable(tableName, sparkSession, options, paths, None)
}
- override def getTable(options: DataSourceOptions, schema: StructType): Table = {
- val tableName = getTableName(options)
- OrcTable(tableName, sparkSession, options, Some(schema))
+ override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
+ val paths = getPaths(options)
+ val tableName = getTableName(paths)
+ OrcTable(tableName, sparkSession, options, paths, Some(schema))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index eb27bbd3abeaa..0b153416b7bb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -26,18 +26,17 @@ import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.orc.OrcFilters
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.Scan
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class OrcScanBuilder(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
schema: StructType,
dataSchema: StructType,
- options: DataSourceOptions) extends FileScanBuilder(schema) {
- lazy val hadoopConf =
- sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
+ options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) {
+ lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)
override def build(): Scan = {
OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index 249df8b8622fb..aac38fb3fa1ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -21,22 +21,24 @@ import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class OrcTable(
name: String,
sparkSession: SparkSession,
- options: DataSourceOptions,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
userSpecifiedSchema: Option[StructType])
- extends FileTable(sparkSession, options, userSpecifiedSchema) {
- override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder =
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): OrcScanBuilder =
new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
OrcUtils.readSchema(sparkSession, files)
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder =
- new OrcWriteBuilder(options)
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
+ new OrcWriteBuilder(options, paths)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala
index 1aec4d872a64d..829ab5fbe1768 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala
@@ -25,10 +25,12 @@ import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFac
import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils}
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class OrcWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String])
+ extends FileWriteBuilder(options, paths) {
-class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(options) {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index bedcb9f8d4e12..fdd80ccaf052e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -95,9 +95,8 @@ class MicroBatchExecution(
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1
logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
- val dsOptions = new DataSourceOptions(options.asJava)
// TODO: operator pushdown.
- val scan = table.newScanBuilder(dsOptions).build()
+ val scan = table.newScanBuilder(options).build()
val stream = scan.toMicroBatchStream(metadataPath)
StreamingDataSourceV2Relation(output, scan, stream)
})
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 180a23c765dd3..cc441937ce70c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -40,10 +40,11 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
/** States for [[StreamExecution]]'s lifecycle. */
@@ -584,7 +585,7 @@ abstract class StreamExecution(
table: SupportsStreamingWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
- val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
+ val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
.withQueryId(id.toString)
.withInputDataSchema(inputPlan.schema)
outputMode match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 1b7aa548e6d21..0d7e9ba363d01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.sources.v2.{Table, TableProvider}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
@@ -95,7 +96,7 @@ case class StreamingRelationV2(
source: TableProvider,
sourceName: String,
table: Table,
- extraOptions: Map[String, String],
+ extraOptions: CaseInsensitiveStringMap,
output: Seq[Attribute],
v1Relation: Option[StreamingRelation])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 923bd749b29b3..dbdfcf8085604 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
extends BaseRelation {
@@ -34,7 +35,7 @@ class ConsoleSinkProvider extends TableProvider
with DataSourceRegister
with CreatableRelationProvider {
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
ConsoleTable
}
@@ -62,7 +63,7 @@ object ConsoleTable extends Table with SupportsStreamingWrite {
override def schema(): StructType = StructType(Nil)
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
private var inputSchema: StructType = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index f55a45d2cee73..c8fb53df52598 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator
-import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.SparkEnv
@@ -33,7 +32,7 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.{SupportsContinuousRead, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock
@@ -71,9 +70,8 @@ class ContinuousExecution(
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1
logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
- val dsOptions = new DataSourceOptions(options.asJava)
// TODO: operator pushdown.
- val scan = table.newScanBuilder(dsOptions).build()
+ val scan = table.newScanBuilder(options).build()
val stream = scan.toContinuousStream(metadataPath)
StreamingDataSourceV2Relation(output, scan, stream)
})
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 48ff70f9c9d07..d55f71c7be830 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -23,17 +23,13 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
case class RateStreamPartitionOffset(
partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
-class RateStreamContinuousStream(
- rowsPerSecond: Long,
- numPartitions: Int,
- options: DataSourceOptions) extends ContinuousStream {
+class RateStreamContinuousStream(rowsPerSecond: Long, numPartitions: Int) extends ContinuousStream {
implicit val defaultFormats: DefaultFormats = DefaultFormats
val creationTime = System.currentTimeMillis()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
index e7bc71394061e..2263b42870a65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
@@ -34,9 +34,9 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.{Offset => _, _}
import org.apache.spark.sql.execution.streaming.sources.TextSocketReader
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.RpcUtils
@@ -49,7 +49,7 @@ import org.apache.spark.util.RpcUtils
* buckets and serves the messages to the executors via a RPC endpoint.
*/
class TextSocketContinuousStream(
- host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
+ host: String, port: Int, numPartitions: Int, options: CaseInsensitiveStringMap)
extends ContinuousStream with Logging {
implicit val defaultFormats: DefaultFormats = DefaultFormats
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index e71f81caeb974..df7990c6a652e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
object MemoryStream {
protected val currentBlockId = new AtomicInteger(0)
@@ -73,7 +74,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas
MemoryStreamTableProvider,
"memory",
new MemoryStreamTable(this),
- Map.empty,
+ CaseInsensitiveStringMap.empty(),
attributes,
None)(sqlContext.sparkSession)
}
@@ -84,7 +85,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas
// This class is used to indicate the memory stream data source. We don't actually use it, as
// memory stream is for test only and we never look it up by name.
object MemoryStreamTableProvider extends TableProvider {
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
throw new IllegalStateException("MemoryStreamTableProvider should not be used.")
}
}
@@ -96,7 +97,7 @@ class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table
override def schema(): StructType = stream.fullSchema()
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MemoryStreamScanBuilder(stream)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
index f2ff30bcf1bef..dbe242784986d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/** Common methods used to create writes for the the console sink */
-class ConsoleWrite(schema: StructType, options: DataSourceOptions)
+class ConsoleWrite(schema: StructType, options: CaseInsensitiveStringMap)
extends StreamingWrite with Logging {
// Number of rows to display, by default 20 rows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index c0ae44a128ca1..44516bbb2a5a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -22,10 +22,11 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.python.PythonForeachWriter
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, Table}
import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A write-only table for forwarding data into the specified [[ForeachWriter]].
@@ -44,7 +45,7 @@ case class ForeachWriterTable[T](
override def schema(): StructType = StructType(Nil)
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
private var inputSchema: StructType = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
index a8feed34b96dc..5403eafd54b61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
@@ -28,9 +28,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{ManualClock, SystemClock}
class RateStreamMicroBatchStream(
@@ -38,7 +38,7 @@ class RateStreamMicroBatchStream(
// The default values here are used in tests.
rampUpTimeSeconds: Long = 0,
numPartitions: Int = 1,
- options: DataSourceOptions,
+ options: CaseInsensitiveStringMap,
checkpointLocation: String)
extends MicroBatchStream with Logging {
import RateStreamProvider._
@@ -155,7 +155,7 @@ class RateStreamMicroBatchStream(
override def toString: String = s"RateStreamV2[rowsPerSecond=$rowsPerSecond, " +
s"rampUpTimeSeconds=$rampUpTimeSeconds, " +
- s"numPartitions=${options.get(NUM_PARTITIONS).orElse("default")}"
+ s"numPartitions=${options.getOrDefault(NUM_PARTITIONS, "default")}"
}
case class RateStreamMicroBatchInputPartition(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 3a0082536512d..3d8a90e99b85a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A source that generates increment long values with timestamps. Each generated row has two
@@ -43,14 +44,14 @@ import org.apache.spark.sql.types._
class RateStreamProvider extends TableProvider with DataSourceRegister {
import RateStreamProvider._
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
val rowsPerSecond = options.getLong(ROWS_PER_SECOND, 1)
if (rowsPerSecond <= 0) {
throw new IllegalArgumentException(
s"Invalid value '$rowsPerSecond'. The option 'rowsPerSecond' must be positive")
}
- val rampUpTimeSeconds = Option(options.get(RAMP_UP_TIME).orElse(null))
+ val rampUpTimeSeconds = Option(options.get(RAMP_UP_TIME))
.map(JavaUtils.timeStringAsSec)
.getOrElse(0L)
if (rampUpTimeSeconds < 0) {
@@ -83,7 +84,7 @@ class RateStreamTable(
override def schema(): StructType = RateStreamProvider.SCHEMA
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new Scan {
override def readSchema(): StructType = RateStreamProvider.SCHEMA
@@ -93,7 +94,7 @@ class RateStreamTable(
}
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
- new RateStreamContinuousStream(rowsPerSecond, numPartitions, options)
+ new RateStreamContinuousStream(rowsPerSecond, numPartitions)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index 540131c8de8a1..9168d46493aef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -29,7 +29,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.LongOffset
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
import org.apache.spark.unsafe.types.UTF8String
@@ -39,8 +38,7 @@ import org.apache.spark.unsafe.types.UTF8String
* and debugging. This MicroBatchReadSupport will *not* work in production applications due to
* multiple reasons, including no support for fault recovery.
*/
-class TextSocketMicroBatchStream(
- host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
+class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int)
extends MicroBatchStream with Logging {
@GuardedBy("this")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index 8ac5bfc307aa3..0adbf1d9b3689 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -30,20 +30,21 @@ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
class TextSocketSourceProvider extends TableProvider with DataSourceRegister with Logging {
- private def checkParameters(params: DataSourceOptions): Unit = {
+ private def checkParameters(params: CaseInsensitiveStringMap): Unit = {
logWarning("The socket source should not be used for production applications! " +
"It does not support recovery.")
- if (!params.get("host").isPresent) {
+ if (!params.containsKey("host")) {
throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
}
- if (!params.get("port").isPresent) {
+ if (!params.containsKey("port")) {
throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
}
Try {
- params.get("includeTimestamp").orElse("false").toBoolean
+ params.getBoolean("includeTimestamp", false)
} match {
case Success(_) =>
case Failure(_) =>
@@ -51,10 +52,10 @@ class TextSocketSourceProvider extends TableProvider with DataSourceRegister wit
}
}
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
checkParameters(options)
new TextSocketTable(
- options.get("host").get,
+ options.get("host"),
options.getInt("port", -1),
options.getInt("numPartitions", SparkSession.active.sparkContext.defaultParallelism),
options.getBoolean("includeTimestamp", false))
@@ -77,12 +78,12 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest
}
}
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new Scan {
override def readSchema(): StructType = schema()
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
- new TextSocketMicroBatchStream(host, port, numPartitions, options)
+ new TextSocketMicroBatchStream(host, port, numPartitions)
}
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 397c5ff0dcb6a..22adceba930fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
@@ -46,7 +47,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
override def schema(): StructType = StructType(Nil)
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
private var needTruncate: Boolean = false
private var inputSchema: StructType = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 96b3a86f5df4d..01f29cdeddc2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRel
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
@@ -175,7 +176,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
- val dsOptions = new DataSourceOptions(options.asJava)
+ val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = userSpecifiedSchema match {
case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
@@ -185,7 +186,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
- provider, source, table, options, table.schema.toAttributes, v1Relation)(
+ provider, source, table, dsOptions, table.schema.toAttributes, v1Relation)(
sparkSession))
// fallback to v1
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 984199488fa7b..33d032eb78c2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources._
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, TableProvider}
+import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableProvider}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -313,7 +314,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
- val dsOptions = new DataSourceOptions(options.asJava)
+ val dsOptions = new CaseInsensitiveStringMap(options.asJava)
provider.getTable(dsOptions) match {
case s: SupportsStreamingWrite => s
case _ => createV1Sink()
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index 2612b6185fd4c..255a9f887878b 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -24,19 +24,19 @@
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.GreaterThan;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableProvider;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
public class JavaAdvancedDataSourceV2 implements TableProvider {
@Override
- public Table getTable(DataSourceOptions options) {
+ public Table getTable(CaseInsensitiveStringMap options) {
return new JavaSimpleBatchTable() {
@Override
- public ScanBuilder newScanBuilder(DataSourceOptions options) {
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new AdvancedScanBuilder();
}
};
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
index d72ab5338aa8c..699859cfaebe1 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
@@ -21,11 +21,11 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableProvider;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -49,10 +49,10 @@ public PartitionReaderFactory createReaderFactory() {
}
@Override
- public Table getTable(DataSourceOptions options) {
+ public Table getTable(CaseInsensitiveStringMap options) {
return new JavaSimpleBatchTable() {
@Override
- public ScanBuilder newScanBuilder(DataSourceOptions options) {
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder();
}
};
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index a513bfb26ef1c..dfbea927e477b 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -22,13 +22,13 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableProvider;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.sources.v2.reader.partitioning.ClusteredDistribution;
import org.apache.spark.sql.sources.v2.reader.partitioning.Distribution;
import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
public class JavaPartitionAwareDataSource implements TableProvider {
@@ -54,10 +54,10 @@ public Partitioning outputPartitioning() {
}
@Override
- public Table getTable(DataSourceOptions options) {
+ public Table getTable(CaseInsensitiveStringMap options) {
return new JavaSimpleBatchTable() {
@Override
- public ScanBuilder newScanBuilder(DataSourceOptions options) {
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder();
}
};
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java
index bbc8492ec4e16..f3755e18b58d5 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java
@@ -19,13 +19,13 @@
import java.util.OptionalLong;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableProvider;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
import org.apache.spark.sql.sources.v2.reader.Statistics;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
public class JavaReportStatisticsDataSource implements TableProvider {
class MyScanBuilder extends JavaSimpleScanBuilder implements SupportsReportStatistics {
@@ -54,10 +54,10 @@ public InputPartition[] planInputPartitions() {
}
@Override
- public Table getTable(DataSourceOptions options) {
+ public Table getTable(CaseInsensitiveStringMap options) {
return new JavaSimpleBatchTable() {
@Override
- public ScanBuilder newScanBuilder(DataSourceOptions options) {
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder();
}
};
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index 815d57ba94139..3800a94f88898 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -17,11 +17,11 @@
package test.org.apache.spark.sql.sources.v2;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableProvider;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
public class JavaSchemaRequiredDataSource implements TableProvider {
@@ -45,7 +45,7 @@ public InputPartition[] planInputPartitions() {
}
@Override
- public Table getTable(DataSourceOptions options, StructType schema) {
+ public Table getTable(CaseInsensitiveStringMap options, StructType schema) {
return new JavaSimpleBatchTable() {
@Override
@@ -54,14 +54,14 @@ public StructType schema() {
}
@Override
- public ScanBuilder newScanBuilder(DataSourceOptions options) {
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder(schema);
}
};
}
@Override
- public Table getTable(DataSourceOptions options) {
+ public Table getTable(CaseInsensitiveStringMap options) {
throw new IllegalArgumentException("requires a user-supplied schema");
}
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 852c4546df885..7474f36c97f75 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -17,10 +17,10 @@
package test.org.apache.spark.sql.sources.v2;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableProvider;
import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
public class JavaSimpleDataSourceV2 implements TableProvider {
@@ -36,10 +36,10 @@ public InputPartition[] planInputPartitions() {
}
@Override
- public Table getTable(DataSourceOptions options) {
+ public Table getTable(CaseInsensitiveStringMap options) {
return new JavaSimpleBatchTable() {
@Override
- public ScanBuilder newScanBuilder(DataSourceOptions options) {
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder();
}
};
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index cccd8e9ee8bd1..034454d21d7ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsR
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -58,7 +57,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
case PhysicalOperation(_, filters,
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
- val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava))
+ val scanBuilder = orcTable.newScanBuilder(options)
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
val pushedFilters = scanBuilder.pushedFilters()
assert(pushedFilters.nonEmpty, "No filter is pushed down")
@@ -102,7 +101,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
case PhysicalOperation(_, filters,
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
- val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava))
+ val scanBuilder = orcTable.newScanBuilder(options)
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
val pushedFilters = scanBuilder.pushedFilters()
if (noneSupported) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index d0418f893143e..c04f6e3f255cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ManualClock
class RateStreamProviderSuite extends StreamTest {
@@ -135,7 +135,7 @@ class RateStreamProviderSuite extends StreamTest {
withTempDir { temp =>
val stream = new RateStreamMicroBatchStream(
rowsPerSecond = 100,
- options = new DataSourceOptions(Map("useManualClock" -> "true").asJava),
+ options = new CaseInsensitiveStringMap(Map("useManualClock" -> "true").asJava),
checkpointLocation = temp.getCanonicalPath)
stream.clock.asInstanceOf[ManualClock].advance(100000)
val startOffset = stream.initialOffset()
@@ -154,7 +154,7 @@ class RateStreamProviderSuite extends StreamTest {
withTempDir { temp =>
val stream = new RateStreamMicroBatchStream(
rowsPerSecond = 20,
- options = DataSourceOptions.empty(),
+ options = CaseInsensitiveStringMap.empty(),
checkpointLocation = temp.getCanonicalPath)
val partitions = stream.planInputPartitions(LongOffset(0L), LongOffset(1L))
val readerFactory = stream.createReaderFactory()
@@ -173,7 +173,7 @@ class RateStreamProviderSuite extends StreamTest {
val stream = new RateStreamMicroBatchStream(
rowsPerSecond = 33,
numPartitions = 11,
- options = DataSourceOptions.empty(),
+ options = CaseInsensitiveStringMap.empty(),
checkpointLocation = temp.getCanonicalPath)
val partitions = stream.planInputPartitions(LongOffset(0L), LongOffset(1L))
val readerFactory = stream.createReaderFactory()
@@ -309,8 +309,7 @@ class RateStreamProviderSuite extends StreamTest {
}
test("continuous data") {
- val stream = new RateStreamContinuousStream(
- rowsPerSecond = 20, numPartitions = 2, options = DataSourceOptions.empty())
+ val stream = new RateStreamContinuousStream(rowsPerSecond = 20, numPartitions = 2)
val partitions = stream.planInputPartitions(stream.initialOffset)
val readerFactory = stream.createContinuousReaderFactory()
assert(partitions.size == 2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index e1769fb0b2881..a5ba4f9633e7b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -35,11 +35,11 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
@@ -176,13 +176,13 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
test("params not given") {
val provider = new TextSocketSourceProvider
intercept[AnalysisException] {
- provider.getTable(new DataSourceOptions(Map.empty[String, String].asJava))
+ provider.getTable(CaseInsensitiveStringMap.empty())
}
intercept[AnalysisException] {
- provider.getTable(new DataSourceOptions(Map("host" -> "localhost").asJava))
+ provider.getTable(new CaseInsensitiveStringMap(Map("host" -> "localhost").asJava))
}
intercept[AnalysisException] {
- provider.getTable(new DataSourceOptions(Map("port" -> "1234").asJava))
+ provider.getTable(new CaseInsensitiveStringMap(Map("port" -> "1234").asJava))
}
}
@@ -190,7 +190,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
val provider = new TextSocketSourceProvider
val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle")
intercept[AnalysisException] {
- provider.getTable(new DataSourceOptions(params.asJava))
+ provider.getTable(new CaseInsensitiveStringMap(params.asJava))
}
}
@@ -201,7 +201,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
StructField("area", StringType) :: Nil)
val params = Map("host" -> "localhost", "port" -> "1234")
val exception = intercept[UnsupportedOperationException] {
- provider.getTable(new DataSourceOptions(params.asJava), userSpecifiedSchema)
+ provider.getTable(new CaseInsensitiveStringMap(params.asJava), userSpecifiedSchema)
}
assert(exception.getMessage.contains(
"socket source does not support user-specified schema"))
@@ -299,7 +299,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
host = "localhost",
port = serverThread.port,
numPartitions = 2,
- options = DataSourceOptions.empty())
+ options = CaseInsensitiveStringMap.empty())
val partitions = stream.planInputPartitions(stream.initialOffset())
assert(partitions.length == 2)
@@ -351,7 +351,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
host = "localhost",
port = serverThread.port,
numPartitions = 2,
- options = DataSourceOptions.empty())
+ options = CaseInsensitiveStringMap.empty())
stream.startOffset = TextSocketOffset(List(5, 5))
assertThrows[IllegalStateException] {
@@ -367,7 +367,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
host = "localhost",
port = serverThread.port,
numPartitions = 2,
- options = new DataSourceOptions(Map("includeTimestamp" -> "true").asJava))
+ options = new CaseInsensitiveStringMap(Map("includeTimestamp" -> "true").asJava))
val partitions = stream.planInputPartitions(stream.initialOffset())
assert(partitions.size == 2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index e184bf57fa7d2..705559d099bec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnarBatch
class DataSourceV2Suite extends QueryTest with SharedSQLContext {
@@ -349,7 +350,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
val options = df.queryExecution.optimizedPlan.collectFirst {
case d: DataSourceV2Relation => d.options
}.get
- assert(options(optionName) === "false")
+ assert(options.get(optionName) === "false")
}
}
@@ -437,8 +438,8 @@ class SimpleSinglePartitionSource extends TableProvider {
}
}
- override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder()
}
}
@@ -454,8 +455,8 @@ class SimpleDataSourceV2 extends TableProvider {
}
}
- override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder()
}
}
@@ -463,8 +464,8 @@ class SimpleDataSourceV2 extends TableProvider {
class AdvancedDataSourceV2 extends TableProvider {
- override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new AdvancedScanBuilder()
}
}
@@ -559,16 +560,16 @@ class SchemaRequiredDataSource extends TableProvider {
override def readSchema(): StructType = schema
}
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
throw new IllegalArgumentException("requires a user-supplied schema")
}
- override def getTable(options: DataSourceOptions, schema: StructType): Table = {
+ override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val userGivenSchema = schema
new SimpleBatchTable {
override def schema(): StructType = userGivenSchema
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder(userGivenSchema)
}
}
@@ -588,8 +589,8 @@ class ColumnarDataSourceV2 extends TableProvider {
}
}
- override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder()
}
}
@@ -659,8 +660,8 @@ class PartitionAwareDataSource extends TableProvider {
override def outputPartitioning(): Partitioning = new MyPartitioning
}
- override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder()
}
}
@@ -699,7 +700,7 @@ class SchemaReadAttemptException(m: String) extends RuntimeException(m)
class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new MyTable(options) {
override def schema(): StructType = {
throw new SchemaReadAttemptException("schema should not be read.")
@@ -725,9 +726,9 @@ class ReportStatisticsDataSource extends TableProvider {
}
}
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new SimpleBatchTable {
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
index fd19a48497fe6..f9f9db35ac2dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
@@ -18,13 +18,14 @@ package org.apache.spark.sql.sources.v2
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.execution.datasources.FileFormat
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader.ScanBuilder
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 {
@@ -32,7 +33,7 @@ class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 {
override def shortName(): String = "parquet"
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new DummyReadOnlyFileTable
}
}
@@ -42,7 +43,7 @@ class DummyReadOnlyFileTable extends Table with SupportsBatchRead {
override def schema(): StructType = StructType(Nil)
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
throw new AnalysisException("Dummy file reader")
}
}
@@ -53,7 +54,7 @@ class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 {
override def shortName(): String = "parquet"
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new DummyWriteOnlyFileTable
}
}
@@ -63,7 +64,7 @@ class DummyWriteOnlyFileTable extends Table with SupportsBatchWrite {
override def schema(): StructType = StructType(Nil)
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder =
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
throw new AnalysisException("Dummy file writer")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index c56a54598cd4c..160354520e432 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -25,12 +25,12 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
-import org.apache.spark.internal.config.SPECULATION_ENABLED
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
/**
@@ -141,22 +141,24 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport {
}
}
- class MyTable(options: DataSourceOptions) extends SimpleBatchTable with SupportsBatchWrite {
- private val path = options.get("path").get()
+ class MyTable(options: CaseInsensitiveStringMap)
+ extends SimpleBatchTable with SupportsBatchWrite {
+
+ private val path = options.get("path")
private val conf = SparkContext.getActive.get.hadoopConfiguration
override def schema(): StructType = tableSchema
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder(new Path(path).toUri.toString, conf)
}
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new MyWriteBuilder(path)
}
}
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new MyTable(options)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 3c2c7004fcd2b..13bb686fbd3b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
class FakeDataStream extends MicroBatchStream with ContinuousStream {
@@ -76,19 +77,19 @@ class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
}
trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
}
trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
- override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new FakeWriteBuilder
}
}
@@ -101,7 +102,7 @@ class FakeReadMicroBatchOnly
override def keyPrefix: String = shortName()
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
LastReadOptions.options = options
new FakeMicroBatchReadTable {}
}
@@ -115,7 +116,7 @@ class FakeReadContinuousOnly
override def keyPrefix: String = shortName()
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
LastReadOptions.options = options
new FakeContinuousReadTable {}
}
@@ -124,7 +125,7 @@ class FakeReadContinuousOnly
class FakeReadBothModes extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-read-microbatch-continuous"
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {}
}
}
@@ -132,7 +133,7 @@ class FakeReadBothModes extends DataSourceRegister with TableProvider {
class FakeReadNeitherMode extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-read-neither-mode"
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new Table {
override def name(): String = "fake"
override def schema(): StructType = StructType(Nil)
@@ -148,7 +149,7 @@ class FakeWriteOnly
override def keyPrefix: String = shortName()
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
LastWriteOptions.options = options
new Table with FakeStreamingWriteTable {
override def name(): String = "fake"
@@ -159,7 +160,7 @@ class FakeWriteOnly
class FakeNoWrite extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-write-neither-mode"
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new Table {
override def name(): String = "fake"
override def schema(): StructType = StructType(Nil)
@@ -186,7 +187,7 @@ class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
override def shortName(): String = "fake-write-v1-fallback"
- override def getTable(options: DataSourceOptions): Table = {
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
new Table with FakeStreamingWriteTable {
override def name(): String = "fake"
override def schema(): StructType = StructType(Nil)
@@ -195,7 +196,7 @@ class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
}
object LastReadOptions {
- var options: DataSourceOptions = _
+ var options: CaseInsensitiveStringMap = _
def clear(): Unit = {
options = null
@@ -203,7 +204,7 @@ object LastReadOptions {
}
object LastWriteOptions {
- var options: DataSourceOptions = _
+ var options: CaseInsensitiveStringMap = _
def clear(): Unit = {
options = null
@@ -320,7 +321,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
eventually(timeout(streamingTimeout)) {
// Write options should not be set.
- assert(LastWriteOptions.options.getBoolean(readOptionName, false) == false)
+ assert(!LastWriteOptions.options.containsKey(readOptionName))
assert(LastReadOptions.options.getBoolean(readOptionName, false))
}
}
@@ -331,7 +332,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
eventually(timeout(streamingTimeout)) {
// Read options should not be set.
- assert(LastReadOptions.options.getBoolean(writeOptionName, false) == false)
+ assert(!LastReadOptions.options.containsKey(writeOptionName))
assert(LastWriteOptions.options.getBoolean(writeOptionName, false))
}
}
@@ -351,10 +352,10 @@ class StreamingDataSourceV2Suite extends StreamTest {
for ((read, write, trigger) <- cases) {
testQuietly(s"stream with read format $read, write format $write, trigger $trigger") {
val sourceTable = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
- .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
+ .newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty())
val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor()
- .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
+ .newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty())
(sourceTable, sinkTable, trigger) match {
// Valid microbatch queries.