Skip to content

Commit 059a8e6

Browse files
committed
typelevel#787 - remove all sql package private code
1 parent 0f9b7cf commit 059a8e6

File tree

12 files changed

+79
-55
lines changed

12 files changed

+79
-55
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ val shimVersion = "0.0.1-SNAPSHOT"
1717
val Scala212 = "2.12.19"
1818
val Scala213 = "2.13.13"
1919

20-
//resolvers in Global += Resolver.mavenLocal
20+
resolvers in Global += Resolver.mavenLocal
2121
resolvers in Global += MavenRepository(
2222
"sonatype-s01-snapshots",
2323
Resolver.SonatypeS01RepositoryRoot + "/snapshots"

dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala dataset/src/main/scala/frameless/FramelessInternals.scala

+10-15
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
1-
package org.apache.spark.sql
1+
package frameless
22

3-
import org.apache.spark.sql.catalyst.expressions.codegen._
4-
import com.sparkutils.shim.expressions.{
5-
Alias2 => Alias,
6-
CreateStruct1 => CreateStruct
7-
}
8-
import org.apache.spark.sql.catalyst.expressions.{
9-
Expression,
10-
NamedExpression,
11-
NonSQLExpression
12-
}
3+
import com.sparkutils.shim.expressions.{Alias2 => Alias, CreateStruct1 => CreateStruct}
4+
import org.apache.spark.sql.shim.{utils => shimUtils}
135
import org.apache.spark.sql.catalyst.InternalRow
14-
import org.apache.spark.sql.catalyst.plans.logical.{ LogicalPlan, Project }
6+
import org.apache.spark.sql.catalyst.expressions.codegen._
7+
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, NonSQLExpression}
8+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
159
import org.apache.spark.sql.execution.QueryExecution
1610
import org.apache.spark.sql.types._
17-
import org.apache.spark.sql.types.ObjectType
11+
import org.apache.spark.sql._
12+
1813
import scala.reflect.ClassTag
1914

2015
object FramelessInternals {
@@ -36,7 +31,7 @@ object FramelessInternals {
3631

3732
def expr(column: Column): Expression = column.expr
3833

39-
def logicalPlan(ds: Dataset[_]): LogicalPlan = ds.logicalPlan
34+
def logicalPlan(ds: Dataset[_]): LogicalPlan = shimUtils.logicalPlan(ds)
4035

4136
def executePlan(ds: Dataset[_], plan: LogicalPlan): QueryExecution =
4237
ds.sparkSession.sessionState.executePlan(plan)
@@ -68,7 +63,7 @@ object FramelessInternals {
6863
new Dataset(sqlContext, plan, encoder)
6964

7065
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =
71-
Dataset.ofRows(sparkSession, logicalPlan)
66+
shimUtils.ofRows(sparkSession, logicalPlan)
7267

7368
// because org.apache.spark.sql.types.UserDefinedType is private[spark]
7469
type UserDefinedType[A >: Null] =

dataset/src/main/scala/frameless/RecordEncoder.scala

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import com.sparkutils.shim.expressions.{
77
WrapOption2 => WrapOption
88
}
99
import com.sparkutils.shim.{ deriveUnitLiteral, ifIsNull }
10-
import org.apache.spark.sql.FramelessInternals
1110
import org.apache.spark.sql.catalyst.expressions.{ Expression, Literal }
1211
import org.apache.spark.sql.shim.{
1312
Invoke5 => Invoke,

dataset/src/main/scala/frameless/TypedColumn.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import org.apache.spark.sql.catalyst.expressions.{
88
Literal
99
} // 787 - Spark 4 source code compat
1010
import org.apache.spark.sql.types.DecimalType
11-
import org.apache.spark.sql.{ Column, FramelessInternals }
11+
import org.apache.spark.sql.Column
1212

1313
import shapeless._
1414
import shapeless.ops.record.Selector

dataset/src/main/scala/frameless/TypedDataset.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import java.util
44
import frameless.functions.CatalystExplodableCollection
55
import frameless.ops._
66
import org.apache.spark.rdd.RDD
7-
import org.apache.spark.sql.{Column, DataFrame, Dataset, FramelessInternals, SparkSession}
7+
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
88
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal}
99
import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint}
1010
import org.apache.spark.sql.catalyst.plans.Inner

dataset/src/main/scala/frameless/TypedEncoder.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import java.util.Date
55
import java.time.{ Duration, Instant, LocalDate, Period }
66
import java.sql.Timestamp
77
import scala.reflect.ClassTag
8-
9-
import org.apache.spark.sql.FramelessInternals
10-
import org.apache.spark.sql.FramelessInternals.UserDefinedType
11-
import org.apache.spark.sql.{ reflection => ScalaReflection }
8+
import FramelessInternals.UserDefinedType
129
import org.apache.spark.sql.catalyst.expressions.{ Expression, UnsafeArrayData, Literal }
1310
import org.apache.spark.sql.catalyst.util.{
1411
ArrayBasedMapData,
@@ -26,6 +23,7 @@ import com.sparkutils.shim.expressions.{
2623
MapObjects5 => MapObjects,
2724
ExternalMapToCatalyst7 => ExternalMapToCatalyst
2825
}
26+
import frameless.{reflection => ScalaReflection}
2927
import org.apache.spark.sql.shim.{
3028
StaticInvoke4 => StaticInvoke,
3129
NewInstance4 => NewInstance,

dataset/src/main/scala/frameless/functions/AggregateFunctions.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package frameless
22
package functions
33

4-
import org.apache.spark.sql.FramelessInternals.expr
4+
import FramelessInternals.expr
55
import org.apache.spark.sql.catalyst.expressions.Literal
66
import org.apache.spark.sql.{ functions => sparkFunctions }
77
import frameless.syntax._

dataset/src/main/scala/frameless/functions/package.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package frameless
22

3+
import frameless.{reflection => ScalaReflection}
34
import scala.reflect.ClassTag
45

56
import shapeless._
67
import shapeless.labelled.FieldType
78
import shapeless.ops.hlist.IsHCons
89
import shapeless.ops.record.{ Keys, Values }
9-
10-
import org.apache.spark.sql.{ reflection => ScalaReflection }
1110
import org.apache.spark.sql.catalyst.expressions.Literal
1211

1312
package object functions extends Udf with UnaryFunctions {

dataset/src/main/scala/frameless/ops/GroupByOps.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import org.apache.spark.sql.catalyst.plans.logical.Project
66
import org.apache.spark.sql.{
77
Column,
88
Dataset,
9-
FramelessInternals,
109
RelationalGroupedDataset
1110
}
1211
import shapeless._
@@ -19,6 +18,7 @@ import shapeless.ops.hlist.{
1918
Tupler
2019
}
2120
import com.sparkutils.shim.expressions.{ MapGroups4 => MapGroups }
21+
import frameless.FramelessInternals
2222

2323
class GroupedByManyOps[T, TK <: HList, K <: HList, KT](
2424
self: TypedDataset[T],

dataset/src/main/scala/org/apache/spark/sql/reflection/package.scala dataset/src/main/scala/frameless/reflection/package.scala

+56-23
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,6 @@
1-
package org.apache.spark.sql
1+
package frameless
22

3-
import org.apache.spark.sql.catalyst.ScalaReflection.{
4-
cleanUpReflectionObjects,
5-
getClassFromType,
6-
localTypeOf
7-
}
8-
import org.apache.spark.sql.types.{
9-
BinaryType,
10-
BooleanType,
11-
ByteType,
12-
CalendarIntervalType,
13-
DataType,
14-
Decimal,
15-
DecimalType,
16-
DoubleType,
17-
FloatType,
18-
IntegerType,
19-
LongType,
20-
NullType,
21-
ObjectType,
22-
ShortType
23-
}
3+
import org.apache.spark.sql.types._
244
import org.apache.spark.unsafe.types.CalendarInterval
255

266
/**
@@ -45,6 +25,59 @@ package object reflection {
4525

4626
import universe._
4727

28+
// Since we are creating a runtime mirror using the class loader of current thread,
29+
// we need to use def at here. So, every time we call mirror, it is using the
30+
// class loader of the current thread.
31+
def mirror: universe.Mirror = {
32+
universe.runtimeMirror(Thread.currentThread().getContextClassLoader)
33+
}
34+
35+
/**
36+
* Any codes calling `scala.reflect.api.Types.TypeApi.<:<` should be wrapped by this method to
37+
* clean up the Scala reflection garbage automatically. Otherwise, it will leak some objects to
38+
* `scala.reflect.runtime.JavaUniverse.undoLog`.
39+
*
40+
* @see https://github.com/scala/bug/issues/8302
41+
*/
42+
def cleanUpReflectionObjects[T](func: => T): T = {
43+
universe.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.undo(func)
44+
}
45+
46+
/**
47+
* Return the Scala Type for `T` in the current classloader mirror.
48+
*
49+
* Use this method instead of the convenience method `universe.typeOf`, which
50+
* assumes that all types can be found in the classloader that loaded scala-reflect classes.
51+
* That's not necessarily the case when running using Eclipse launchers or even
52+
* Sbt console or test (without `fork := true`).
53+
*
54+
* @see SPARK-5281
55+
*/
56+
def localTypeOf[T: TypeTag]: `Type` = {
57+
val tag = implicitly[TypeTag[T]]
58+
tag.in(mirror).tpe.dealias
59+
}
60+
61+
/*
62+
* Retrieves the runtime class corresponding to the provided type.
63+
*/
64+
def getClassFromType(tpe: Type): Class[_] =
65+
mirror.runtimeClass(erasure(tpe).dealias.typeSymbol.asClass)
66+
67+
private def erasure(tpe: Type): Type = {
68+
// For user-defined AnyVal classes, we should not erasure it. Otherwise, it will
69+
// resolve to underlying type which wrapped by this class, e.g erasure
70+
// `case class Foo(i: Int) extends AnyVal` will return type `Int` instead of `Foo`.
71+
// But, for other types, we do need to erasure it. For example, we need to erasure
72+
// `scala.Any` to `java.lang.Object` in order to load it from Java ClassLoader.
73+
// Please see SPARK-17368 & SPARK-31190 for more details.
74+
if (isSubtype(tpe, localTypeOf[AnyVal]) && !tpe.toString.startsWith("scala")) {
75+
tpe
76+
} else {
77+
tpe.erasure
78+
}
79+
}
80+
4881
/**
4982
* Returns the Spark SQL DataType for a given scala type. Where this is not an exact mapping
5083
* to a native type, an ObjectType is returned. Special handling is also used for Arrays including
@@ -62,7 +95,7 @@ package object reflection {
6295
*
6396
* See https://github.com/scala/bug/issues/10766
6497
*/
65-
private[sql] def isSubtype(tpe1: `Type`, tpe2: `Type`): Boolean = {
98+
private def isSubtype(tpe1: `Type`, tpe2: `Type`): Boolean = {
6699
ScalaSubtypeLock.synchronized {
67100
tpe1 <:< tpe2
68101
}

dataset/src/test/scala/frameless/UdtEncodedClass.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package frameless
33
import org.apache.spark.sql.catalyst.InternalRow
44
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
55
import org.apache.spark.sql.types._
6-
import org.apache.spark.sql.FramelessInternals.UserDefinedType
6+
import FramelessInternals.UserDefinedType
77

88
@SQLUserDefinedType(udt = classOf[UdtEncodedClassUdt])
99
class UdtEncodedClass(val a: Int, val b: Array[Double]) {
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package frameless
22

3-
import org.apache.spark.sql.FramelessInternals.UserDefinedType
4-
import org.apache.spark.ml.FramelessInternals
3+
import FramelessInternals.UserDefinedType
4+
import org.apache.spark.ml.{FramelessInternals => MLFramelessInternals}
55
import org.apache.spark.ml.linalg.{Matrix, Vector}
66

77
package object ml {
88

9-
implicit val mlVectorUdt: UserDefinedType[Vector] = FramelessInternals.vectorUdt
9+
implicit val mlVectorUdt: UserDefinedType[Vector] = MLFramelessInternals.vectorUdt
1010

11-
implicit val mlMatrixUdt: UserDefinedType[Matrix] = FramelessInternals.matrixUdt
11+
implicit val mlMatrixUdt: UserDefinedType[Matrix] = MLFramelessInternals.matrixUdt
1212

1313
}

0 commit comments

Comments
 (0)