Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
456d2f3
fix problems that affect windows shell environments (cygwin/msys2/mingw)
philwalk Oct 8, 2022
89adbbd
unset SHELL variable in spark-class.cmd
philwalk Oct 10, 2022
9e8198d
[SPARK-40726][DOCS] Supplement undocumented orc configurations in doc…
dcoliversun Oct 10, 2022
9a97f8c
[SPARK-40705][SQL] Handle case of using mutable array when converting…
Amraneze Oct 10, 2022
288bdd2
[SPARK-40714][SQL] Remove `PartitionAlreadyExistsException`
MaxGekk Oct 10, 2022
67c6408
[SPARK-40534][CONNECT] Extend the support for Join with different joi…
amaliujia Oct 11, 2022
f8d68b0
[SPARK-40725][INFRA][FOLLOWUP] Mark mypy-protobuf as an optional depe…
amaliujia Oct 11, 2022
4eb0edf
[SPARK-40596][CORE] Populate ExecutorDecommission with messages in Ex…
bozhang2820 Oct 11, 2022
e927a7e
[SPARK-40677][CONNECT][FOLLOWUP] Refactor shade `relocation/rename` r…
LuciferYang Oct 11, 2022
d59f71c
[SPARK-40698][PS][SQL] Improve the precision of `product` for integra…
zhengruifeng Oct 11, 2022
4e4a848
[SPARK-40707][CONNECT] Add groupby to connect DSL and test more than …
amaliujia Oct 11, 2022
47d119d
[SPARK-40358][SQL] Migrate collection type check failures onto error …
lvshaokang Oct 11, 2022
9ddd734
[SPARK-40740][SQL] Improve listFunctions in SessionCatalog
allisonwang-db Oct 11, 2022
8e85393
[SPARK-40667][SQL] Refactor File Data Source Options
xiaonanyang-db Oct 11, 2022
d94c65e
[SPARK-40717][CONNECT] Support Column Alias in the Connect DSL
amaliujia Oct 11, 2022
6c182da
[SPARK-40744][PS] Make `_reduce_for_stat_function` in `groupby` accep…
zhengruifeng Oct 11, 2022
8e31554
[SPARK-40742][CORE][SQL] Fix Java compilation warnings related to gen…
LuciferYang Oct 11, 2022
efd9ef9
[SPARK-40735] Consistently invoke bash with /usr/bin/env bash in scri…
huangxiaopingRD Oct 11, 2022
996e407
[SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGA…
itholic Oct 11, 2022
c6a9569
fix problems that affect windows shell environments (cygwin/msys2/mingw)
philwalk Oct 8, 2022
c99e267
unset SHELL variable in spark-class.cmd
philwalk Oct 10, 2022
1c448d6
rebase to trigger build
philwalk Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/check-cran.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/create-docs.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/create-rd.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/find-r.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/install-dev.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/install-source-package.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
3 changes: 2 additions & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
while IFS= read -d "$DELIM" -r _ARG; do
ARG=${_ARG//$'\r'} # if windows, args can have trailing CR
if [ "$CMD_START_FLAG" == "true" ]; then
CMD+=("$ARG")
else
Expand Down
3 changes: 3 additions & 0 deletions bin/spark-class.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ rem the environment, it just launches a new cmd to do the real work.

rem The outermost quotes are used to prevent Windows command line parse error
rem when there are some quotes in parameters, see SPARK-21877.

rem SHELL must be unset in non-SHELL environment
set SHELL=
cmd /V /E /C ""%~dp0spark-class2.cmd" %*"
2 changes: 1 addition & 1 deletion bin/sparkR
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion binder/postBuild
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion build/spark-build-info
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

RESOURCE_DIR="$1"
mkdir -p "$RESOURCE_DIR"
SPARK_BUILD_INFO="${RESOURCE_DIR}"/spark-version-info.properties
SPARK_BUILD_INFO="${RESOURCE_DIR%/}"/spark-version-info.properties

echo_build_properties() {
echo version=$1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
Expand Down Expand Up @@ -53,7 +54,7 @@ protected RecordWriter<AvroKey<GenericRecord>, NullWritable> create(
CodecFactory compressionCodec,
OutputStream outputStream,
int syncInterval) throws IOException {
return new SparkAvroKeyRecordWriter(
return new SparkAvroKeyRecordWriter<>(
writerSchema, dataModel, compressionCodec, outputStream, syncInterval, metadata);
}
}
Expand All @@ -72,7 +73,7 @@ class SparkAvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable>
OutputStream outputStream,
int syncInterval,
Map<String, String> metadata) throws IOException {
this.mAvroFileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema));
this.mAvroFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(writerSchema, dataModel));
for (Map.Entry<String, String> entry : metadata.entrySet()) {
this.mAvroFileWriter.setMeta(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode}
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -37,6 +37,8 @@ private[sql] class AvroOptions(
@transient val conf: Configuration)
extends FileSourceOptions(parameters) with Logging {

import AvroOptions._

def this(parameters: Map[String, String], conf: Configuration) = {
this(CaseInsensitiveMap(parameters), conf)
}
Expand All @@ -54,8 +56,8 @@ private[sql] class AvroOptions(
* instead of "string" type in the default converted schema.
*/
val schema: Option[Schema] = {
parameters.get("avroSchema").map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
val avroUrlSchema = parameters.get("avroSchemaUrl").map(url => {
parameters.get(AVRO_SCHEMA).map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
val avroUrlSchema = parameters.get(AVRO_SCHEMA_URL).map(url => {
log.debug("loading avro schema from url: " + url)
val fs = FileSystem.get(new URI(url), conf)
val in = fs.open(new Path(url))
Expand All @@ -75,20 +77,20 @@ private[sql] class AvroOptions(
* whose field names do not match. Defaults to false.
*/
val positionalFieldMatching: Boolean =
parameters.get("positionalFieldMatching").exists(_.toBoolean)
parameters.get(POSITIONAL_FIELD_MATCHING).exists(_.toBoolean)

/**
* Top level record name in write result, which is required in Avro spec.
* See https://avro.apache.org/docs/1.11.1/specification/#schema-record .
* Default value is "topLevelRecord"
*/
val recordName: String = parameters.getOrElse("recordName", "topLevelRecord")
val recordName: String = parameters.getOrElse(RECORD_NAME, "topLevelRecord")

/**
* Record namespace in write result. Default value is "".
* See Avro spec for details: https://avro.apache.org/docs/1.11.1/specification/#schema-record .
*/
val recordNamespace: String = parameters.getOrElse("recordNamespace", "")
val recordNamespace: String = parameters.getOrElse(RECORD_NAMESPACE, "")

/**
* The `ignoreExtension` option controls ignoring of files without `.avro` extensions in read.
Expand All @@ -104,7 +106,7 @@ private[sql] class AvroOptions(
ignoreFilesWithoutExtensionByDefault)

parameters
.get(AvroOptions.ignoreExtensionKey)
.get(IGNORE_EXTENSION)
.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
}
Expand All @@ -116,21 +118,21 @@ private[sql] class AvroOptions(
* taken into account. If the former one is not set too, the `snappy` codec is used by default.
*/
val compression: String = {
parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
parameters.get(COMPRESSION).getOrElse(SQLConf.get.avroCompressionCodec)
}

val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
parameters.get(MODE).map(ParseMode.fromString).getOrElse(FailFastMode)

/**
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
*/
val datetimeRebaseModeInRead: String = parameters
.get(AvroOptions.DATETIME_REBASE_MODE)
.get(DATETIME_REBASE_MODE)
.getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
}

private[sql] object AvroOptions {
private[sql] object AvroOptions extends DataSourceOptions {
def apply(parameters: Map[String, String]): AvroOptions = {
val hadoopConf = SparkSession
.getActiveSession
Expand All @@ -139,11 +141,17 @@ private[sql] object AvroOptions {
new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf)
}

val ignoreExtensionKey = "ignoreExtension"

val IGNORE_EXTENSION = newOption("ignoreExtension")
val MODE = newOption("mode")
val RECORD_NAME = newOption("recordName")
val COMPRESSION = newOption("compression")
val AVRO_SCHEMA = newOption("avroSchema")
val AVRO_SCHEMA_URL = newOption("avroSchemaUrl")
val RECORD_NAMESPACE = newOption("recordNamespace")
val POSITIONAL_FIELD_MATCHING = newOption("positionalFieldMatching")
// The option controls rebasing of the DATE and TIMESTAMP values between
// Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Avro
// datasource similarly to the SQL config `spark.sql.avro.datetimeRebaseModeInRead`,
// and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`.
val DATETIME_REBASE_MODE = "datetimeRebaseMode"
val DATETIME_REBASE_MODE = newOption("datetimeRebaseMode")
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroOptions.ignoreExtensionKey
import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
Expand All @@ -50,8 +50,8 @@ private[sql] object AvroUtils extends Logging {
val conf = spark.sessionState.newHadoopConfWithOptions(options)
val parsedOptions = new AvroOptions(options, conf)

if (parsedOptions.parameters.contains(ignoreExtensionKey)) {
logWarning(s"Option $ignoreExtensionKey is deprecated. Please use the " +
if (parsedOptions.parameters.contains(IGNORE_EXTENSION)) {
logWarning(s"Option $IGNORE_EXTENSION is deprecated. Please use the " +
"general data source option pathGlobFilter for filtering file names.")
}
// User can specify an optional avro json schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ abstract class AvroSuite
.save(s"$tempDir/${UUID.randomUUID()}")
}.getMessage
assert(message.contains("Caused by: java.lang.NullPointerException: "))
assert(message.contains("null in string in field Name"))
assert(message.contains("null value for (non-nullable) string at test_schema.Name"))
}
}

Expand Down Expand Up @@ -1804,13 +1804,13 @@ abstract class AvroSuite
spark
.read
.format("avro")
.option(AvroOptions.ignoreExtensionKey, false)
.option(AvroOptions.IGNORE_EXTENSION, false)
.load(dir.getCanonicalPath)
.count()
}
val deprecatedEvents = logAppender.loggingEvents
.filter(_.getMessage.getFormattedMessage.contains(
s"Option ${AvroOptions.ignoreExtensionKey} is deprecated"))
s"Option ${AvroOptions.IGNORE_EXTENSION} is deprecated"))
assert(deprecatedEvents.size === 1)
}
}
Expand Down Expand Up @@ -2272,6 +2272,20 @@ abstract class AvroSuite
checkAnswer(df2, df.collect().toSeq)
}
}

test("SPARK-40667: validate Avro Options") {
assert(AvroOptions.getAllOptions.size == 9)
// Please add validation on any new Avro options here
assert(AvroOptions.isValidOption("ignoreExtension"))
assert(AvroOptions.isValidOption("mode"))
assert(AvroOptions.isValidOption("recordName"))
assert(AvroOptions.isValidOption("compression"))
assert(AvroOptions.isValidOption("avroSchema"))
assert(AvroOptions.isValidOption("avroSchemaUrl"))
assert(AvroOptions.isValidOption("recordNamespace"))
assert(AvroOptions.isValidOption("positionalFieldMatching"))
assert(AvroOptions.isValidOption("datetimeRebaseMode"))
}
}

class AvroV1Suite extends AvroSuite {
Expand Down
2 changes: 2 additions & 0 deletions connector/connect/dev/generate_protos.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
Expand Down
66 changes: 53 additions & 13 deletions connector/connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,13 @@
as assembly build.
-->
<include>com.google.android:annotations</include>
<include>com.google.api.grpc:proto-google-common-proto</include>
<include>com.google.api.grpc:proto-google-common-protos</include>
<include>io.perfmark:perfmark-api</include>
<include>org.codehaus.mojo:animal-sniffer-annotations</include>
<include>com.google.errorprone:error_prone_annotations</include>
<include>com.google.j2objc:j2objc-annotations</include>
<include>org.checkerframework:checker-qual</include>
<include>com.google.code.gson:gson</include>
</includes>
</artifactSet>
<relocations>
Expand Down Expand Up @@ -303,28 +305,66 @@
</relocation>

<relocation>
<pattern>com.google.android</pattern>
<shadedPattern>${spark.shade.packageName}.connect.android</shadedPattern>
<pattern>android.annotation</pattern>
<shadedPattern>${spark.shade.packageName}.connect.android_annotation</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.api.grpc</pattern>
<shadedPattern>${spark.shade.packageName}.connect.api</shadedPattern>
<pattern>io.perfmark</pattern>
<shadedPattern>${spark.shade.packageName}.connect.io_perfmark</shadedPattern>
</relocation>
<relocation>
<pattern>io.perfmark</pattern>
<shadedPattern>${spark.shade.packageName}.connect.perfmark</shadedPattern>
<pattern>org.codehaus.mojo.animal_sniffer</pattern>
<shadedPattern>${spark.shade.packageName}.connect.animal_sniffer</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.j2objc.annotations</pattern>
<shadedPattern>${spark.shade.packageName}.connect.j2objc_annotations</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.errorprone.annotations</pattern>
<shadedPattern>${spark.shade.packageName}.connect.errorprone_annotations</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>${spark.shade.packageName}.connect.checkerframework</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.gson</pattern>
<shadedPattern>${spark.shade.packageName}.connect.gson</shadedPattern>
</relocation>

<!--
For `com.google.api.grpc:proto-google-common-protos`, do not directly define pattern
as `common.google`, otherwise, otherwise, the relocation result may be uncertain due
to the change of rule order.
-->
<relocation>
<pattern>com.google.api</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.api</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.cloud</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.cloud</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.geo</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.geo</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.logging</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.logging</shadedPattern>
</relocation>
<relocation>
<pattern>org.codehaus.mojo</pattern>
<shadedPattern>${spark.shade.packageName}.connect.mojo</shadedPattern>
<pattern>com.google.longrunning</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.longrunning</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.errorprone</pattern>
<shadedPattern>${spark.shade.packageName}.connect.errorprone</shadedPattern>
<pattern>com.google.rpc</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.rpc</shadedPattern>
</relocation>
<relocation>
<pattern>com.com.google.j2objc</pattern>
<shadedPattern>${spark.shade.packageName}.connect.j2objc</shadedPattern>
<pattern>com.google.type</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.type</shadedPattern>
</relocation>
</relocations>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ message CreateScalarFunction {
repeated string parts = 1;
FunctionLanguage language = 2;
bool temporary = 3;
repeated Type argument_types = 4;
Type return_type = 5;
repeated DataType argument_types = 4;
DataType return_type = 5;

// How the function body is defined:
oneof function_definition {
Expand Down
Loading