Skip to content

Commit

Permalink
Merge branch 'apache:master' into SPARK-45636
Browse files Browse the repository at this point in the history
  • Loading branch information
LuciferYang authored Oct 27, 2023
2 parents c00059e + 3d6e31a commit bea557c
Show file tree
Hide file tree
Showing 93 changed files with 776 additions and 500 deletions.
11 changes: 2 additions & 9 deletions .github/workflows/publish_snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,5 @@ jobs:
GPG_KEY: "not_used"
GPG_PASSPHRASE: "not_used"
GIT_REF: ${{ matrix.branch }}
run: |
while true
do
date
top -b -n 1 -i
sleep 1
echo
done | sed "s/^/mem: /" &
./dev/create-release/release-build.sh publish-snapshot
MAVEN_MXM_OPT: 2g
run: ./dev/create-release/release-build.sh publish-snapshot
5 changes: 5 additions & 0 deletions binder/postBuild
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
# This file is used for Binder integration to install PySpark available in
# Jupyter notebook.

# SPARK-45706: Should fail fast. Otherwise, the Binder image is successfully
# built, and it cannot be rebuilt.
set -o pipefail
set -e

VERSION=$(python -c "exec(open('python/pyspark/version.py').read()); print(__version__)")
TAG=$(git describe --tags --exact-match 2>/dev/null)

Expand Down
47 changes: 27 additions & 20 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,11 @@
"expects one of the units without quotes YEAR, QUARTER, MONTH, WEEK, DAY, DAYOFYEAR, HOUR, MINUTE, SECOND, MILLISECOND, MICROSECOND, but got the string literal <invalidValue>."
]
},
"LENGTH" : {
"message" : [
"Expects `length` greater than or equal to 0, but got <length>."
]
},
"NULL" : {
"message" : [
"expects a non-NULL value."
Expand All @@ -1989,6 +1994,11 @@
"Expects group index between 0 and <groupCount>, but got <groupIndex>."
]
},
"START" : {
"message" : [
"Expects a positive or a negative value for `start`, but got 0."
]
},
"ZERO_INDEX" : {
"message" : [
"expects %1$, %2$ and so on, but got %0$."
Expand Down Expand Up @@ -2192,6 +2202,12 @@
],
"sqlState" : "42K0F"
},
"INVALID_TIME_TRAVEL_SPEC" : {
"message" : [
"Cannot specify both version and timestamp when time travelling the table."
],
"sqlState" : "42K0E"
},
"INVALID_TIME_TRAVEL_TIMESTAMP_EXPR" : {
"message" : [
"The time travel timestamp expression <expr> is invalid."
Expand All @@ -2207,6 +2223,11 @@
"Must be deterministic."
]
},
"OPTION" : {
"message" : [
"Timestamp string in the options must be able to cast to TIMESTAMP type."
]
},
"UNEVALUABLE" : {
"message" : [
"Must be evaluable."
Expand Down Expand Up @@ -2386,6 +2407,12 @@
],
"sqlState" : "42803"
},
"MULTIPLE_TIME_TRAVEL_SPEC" : {
"message" : [
"Cannot specify time travel in both the time travel clause and options."
],
"sqlState" : "42K0E"
},
"MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION" : {
"message" : [
"The expression <expr> does not support more than one source."
Expand Down Expand Up @@ -5132,11 +5159,6 @@
"<errorMessage>"
]
},
"_LEGACY_ERROR_TEMP_1334" : {
"message" : [
"Cannot specify both version and timestamp when time travelling the table."
]
},
"_LEGACY_ERROR_TEMP_1338" : {
"message" : [
"Sinks cannot request distribution and ordering in continuous execution mode."
Expand Down Expand Up @@ -5737,21 +5759,6 @@
"<userClass> is not annotated with SQLUserDefinedType nor registered with UDTRegistration.}"
]
},
"_LEGACY_ERROR_TEMP_2156" : {
"message" : [
"The size function doesn't support the operand type <dataType>."
]
},
"_LEGACY_ERROR_TEMP_2157" : {
"message" : [
"Unexpected value for start in function <prettyName>: SQL array indices start at 1."
]
},
"_LEGACY_ERROR_TEMP_2158" : {
"message" : [
"Unexpected value for length in function <prettyName>: length must be greater than or equal to 0."
]
},
"_LEGACY_ERROR_TEMP_2159" : {
"message" : [
"Unsuccessful try to concat arrays with <numberOfElements> elements due to exceeding the array size limit <maxRoundedArrayLength>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ private[sql] case class AvroDataToCatalyst(
@transient private lazy val reader = new GenericDatumReader[Any](actualSchema, expectedSchema)

@transient private lazy val deserializer =
new AvroDeserializer(expectedSchema, dataType, avroOptions.datetimeRebaseModeInRead)
new AvroDeserializer(expectedSchema, dataType,
avroOptions.datetimeRebaseModeInRead, avroOptions.useStableIdForUnionType)

@transient private var decoder: BinaryDecoder = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,21 @@ private[sql] class AvroDeserializer(
rootCatalystType: DataType,
positionalFieldMatch: Boolean,
datetimeRebaseSpec: RebaseSpec,
filters: StructFilters) {
filters: StructFilters,
useStableIdForUnionType: Boolean) {

def this(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: String) = {
datetimeRebaseMode: String,
useStableIdForUnionType: Boolean) = {
this(
rootAvroType,
rootCatalystType,
positionalFieldMatch = false,
RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
new NoopFilters)
new NoopFilters,
useStableIdForUnionType)
}

private lazy val decimalConversions = new DecimalConversion()
Expand Down Expand Up @@ -118,7 +121,7 @@ private[sql] class AvroDeserializer(
val incompatibleMsg = errorPrefix +
s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})"

val realDataType = SchemaConverters.toSqlType(avroType).dataType
val realDataType = SchemaConverters.toSqlType(avroType, useStableIdForUnionType).dataType
val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA
val preventReadingIncorrectType = !SQLConf.get.getConf(confKey)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ private[sql] class AvroFileFormat extends FileFormat
requiredSchema,
parsedOptions.positionalFieldMatching,
datetimeRebaseMode,
avroFilters)
avroFilters,
parsedOptions.useStableIdForUnionType)
override val stopPosition = file.start + file.length

override def hasNext: Boolean = hasNextRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,24 @@ object SchemaConverters {
*/
case class SchemaType(dataType: DataType, nullable: Boolean)

/**
* Converts an Avro schema to a corresponding Spark SQL schema.
*
* @since 4.0.0
*/
def toSqlType(avroSchema: Schema, useStableIdForUnionType: Boolean): SchemaType = {
toSqlTypeHelper(avroSchema, Set.empty, useStableIdForUnionType)
}
/**
* Converts an Avro schema to a corresponding Spark SQL schema.
*
* @since 2.4.0
*/
def toSqlType(avroSchema: Schema): SchemaType = {
toSqlTypeHelper(avroSchema, Set.empty, AvroOptions(Map()))
toSqlType(avroSchema, false)
}
def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType = {
toSqlTypeHelper(avroSchema, Set.empty, AvroOptions(options))
toSqlTypeHelper(avroSchema, Set.empty, AvroOptions(options).useStableIdForUnionType)
}

// The property specifies Catalyst type of the given field
Expand All @@ -64,7 +72,7 @@ object SchemaConverters {
private def toSqlTypeHelper(
avroSchema: Schema,
existingRecordNames: Set[String],
avroOptions: AvroOptions): SchemaType = {
useStableIdForUnionType: Boolean): SchemaType = {
avroSchema.getType match {
case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false)
Expand Down Expand Up @@ -117,7 +125,7 @@ object SchemaConverters {
}
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames, avroOptions)
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames, useStableIdForUnionType)
StructField(f.name, schemaType.dataType, schemaType.nullable)
}

Expand All @@ -127,13 +135,14 @@ object SchemaConverters {
val schemaType = toSqlTypeHelper(
avroSchema.getElementType,
existingRecordNames,
avroOptions)
useStableIdForUnionType)
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false)

case MAP =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames, avroOptions)
val schemaType = toSqlTypeHelper(avroSchema.getValueType,
existingRecordNames, useStableIdForUnionType)
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false)
Expand All @@ -143,17 +152,18 @@ object SchemaConverters {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
if (remainingUnionTypes.size == 1) {
toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames, avroOptions)
toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames, useStableIdForUnionType)
.copy(nullable = true)
} else {
toSqlTypeHelper(
Schema.createUnion(remainingUnionTypes.asJava),
existingRecordNames,
avroOptions).copy(nullable = true)
useStableIdForUnionType).copy(nullable = true)
}
} else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames, avroOptions)
toSqlTypeHelper(avroSchema.getTypes.get(0),
existingRecordNames, useStableIdForUnionType)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false)
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
Expand All @@ -167,9 +177,9 @@ object SchemaConverters {
val fieldNameSet : mutable.Set[String] = mutable.Set()
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames, avroOptions)
val schemaType = toSqlTypeHelper(s, existingRecordNames, useStableIdForUnionType)

val fieldName = if (avroOptions.useStableIdForUnionType) {
val fieldName = if (useStableIdForUnionType) {
// Avro's field name may be case sensitive, so field names for two named type
// could be "a" and "A" and we need to distinguish them. In this case, we throw
// an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ case class AvroPartitionReaderFactory(
readDataSchema,
options.positionalFieldMatching,
datetimeRebaseMode,
avroFilters)
avroFilters,
options.useStableIdForUnionType)
override val stopPosition = partitionedFile.start + partitionedFile.length

override def next(): Boolean = hasNextRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite

val expected = {
val avroSchema = new Schema.Parser().parse(schema)
SchemaConverters.toSqlType(avroSchema).dataType match {
SchemaConverters.toSqlType(avroSchema, false).dataType match {
case st: StructType => Row.fromSeq((0 until st.length).map(_ => null))
case _ => null
}
Expand Down Expand Up @@ -281,13 +281,14 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
data: GenericData.Record,
expected: Option[Any],
filters: StructFilters = new NoopFilters): Unit = {
val dataType = SchemaConverters.toSqlType(schema).dataType
val dataType = SchemaConverters.toSqlType(schema, false).dataType
val deserializer = new AvroDeserializer(
schema,
dataType,
false,
RebaseSpec(LegacyBehaviorPolicy.CORRECTED),
filters)
filters,
false)
val deserialized = deserializer.deserialize(data)
expected match {
case None => assert(deserialized == None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class AvroRowReaderSuite
StructType(new StructField("value", IntegerType, true) :: Nil),
false,
RebaseSpec(CORRECTED),
new NoopFilters)
new NoopFilters,
false)
override val stopPosition = fileSize

override def hasNext: Boolean = hasNextRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ object AvroSerdeSuite {
sql,
isPositional(matchType),
RebaseSpec(CORRECTED),
new NoopFilters)
new NoopFilters,
false)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2147,7 +2147,7 @@ abstract class AvroSuite

private def checkSchemaWithRecursiveLoop(avroSchema: String): Unit = {
val message = intercept[IncompatibleSchemaException] {
SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema), false)
}.getMessage

assert(message.contains("Found recursive reference in Avro schema"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BoxedLongEncoder
import org.apache.spark.sql.connect.client.{ClassFinder, SparkConnectClient, SparkResult}
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
import org.apache.spark.sql.connect.client.util.Cleaner
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.{CatalogImpl, SqlApiConf}
import org.apache.spark.sql.streaming.DataStreamReader
Expand All @@ -66,7 +65,6 @@ import org.apache.spark.sql.types.StructType
*/
class SparkSession private[sql] (
private[sql] val client: SparkConnectClient,
private val cleaner: Cleaner,
private val planIdGenerator: AtomicLong)
extends Serializable
with Closeable
Expand Down Expand Up @@ -536,7 +534,6 @@ class SparkSession private[sql] (
private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]): SparkResult[T] = {
val value = client.execute(plan)
val result = new SparkResult(value, allocator, encoder, timeZoneId)
cleaner.register(result)
result
}

Expand Down Expand Up @@ -774,7 +771,7 @@ object SparkSession extends Logging {
* Create a new [[SparkSession]] based on the connect client [[Configuration]].
*/
private[sql] def create(configuration: Configuration): SparkSession = {
new SparkSession(configuration.toSparkConnectClient, cleaner, planIdGenerator)
new SparkSession(configuration.toSparkConnectClient, planIdGenerator)
}

/**
Expand All @@ -795,12 +792,6 @@ object SparkSession extends Logging {
*/
def builder(): Builder = new Builder()

private[sql] lazy val cleaner = {
val cleaner = new Cleaner
cleaner.start()
cleaner
}

class Builder() extends Logging {
// Initialize the connection string of the Spark Connect client builder from SPARK_REMOTE
// by default, if it exists. The connection string can be overridden using
Expand Down Expand Up @@ -911,7 +902,7 @@ object SparkSession extends Logging {

private def tryCreateSessionFromClient(): Option[SparkSession] = {
if (client != null) {
Option(new SparkSession(client, cleaner, planIdGenerator))
Option(new SparkSession(client, planIdGenerator))
} else {
None
}
Expand Down
Loading

0 comments on commit bea557c

Please sign in to comment.