Skip to content

Commit

Permalink
[SPARK-46118][SQL][SS][CONNECT] Use SparkSession.sessionState.conf
Browse files Browse the repository at this point in the history
…instead of `SQLContext.conf` and mark `SQLContext.conf` as deprecated

### What changes were proposed in this pull request?
There are some calls to `SparkSession.sqlContext.conf` in the Spark code, which is equivalent to `SparkSession.sqlContext.sparkSession.sessionState.conf`. This PR changes them to directly call `SparkSession.sessionState.conf` or expand to `SQLContext.SparkSession.sessionState.conf`

At the same time, this PR marks the internal API `SQLContext.conf` as deprecated, and `SparkSession.sessionState.conf` should be used directly.

### Why are the changes needed?
1. `SparkSession.sessionState.conf` has a shallower call stack compared to `SparkSession.sqlContext.conf`
2. `SQLContext` has been marked as deprecated since Apache Spark 1.6, and its APIs should be avoided as much as possible in Spark's internal code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44034 from LuciferYang/sc-conf.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
a0x8o and LuciferYang committed Nov 27, 2023
1 parent 5b57ffa commit c28593b
Show file tree
Hide file tree
Showing 38 changed files with 72 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2235,7 +2235,7 @@ class SparkConnectPlanner(

JoinWith.typedJoinWith(
joined,
session.sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity,
session.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity,
session.sessionState.analyzer.resolver,
rel.getJoinDataType.getIsLeftStruct,
rel.getJoinDataType.getIsRightStruct)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.{PersistentVolumeClaim,
PersistentVolumeClaimBuilder, PodSpec, PodSpecBuilder, PodTemplateSpec}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
podsAllocatorUnderTest = new StatefulSetPodsAllocator(
conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, null)
when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty)
when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
}

Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ class Dataset[T] private[sql](

withTypedPlan(JoinWith.typedJoinWith(
joined,
sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity,
sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity,
sparkSession.sessionState.analyzer.resolver,
this.exprEnc.isSerializedAsStructForTopLevel,
other.exprEnc.isSerializedAsStructForTopLevel))
Expand Down Expand Up @@ -1450,7 +1450,7 @@ class Dataset[T] private[sql](
case "*" =>
Column(ResolvedStar(queryExecution.analyzed.output))
case _ =>
if (sqlContext.conf.supportQuotedRegexColumnName) {
if (sparkSession.sessionState.conf.supportQuotedRegexColumnName) {
colRegex(colName)
} else {
Column(addDataFrameIdToCol(resolve(colName)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class SQLContext private[sql](val sparkSession: SparkSession)

private[sql] def sessionState: SessionState = sparkSession.sessionState
private[sql] def sharedState: SharedState = sparkSession.sharedState
@deprecated("Use SparkSession.sessionState.conf instead", "4.0.0")
private[sql] def conf: SQLConf = sessionState.conf

def sparkContext: SparkContext = sparkSession.sparkContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ case class CreateDataSourceTableAsSelectCommand(

result match {
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
sparkSession.sessionState.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sessionState.executePlan(RepairTableCommand(
table.identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ case class RepairTableCommand(
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")

val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
val partitionStats = if (spark.sessionState.conf.gatherFastStats) {
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
} else {
Map.empty[Path, PartitionStatistics]
Expand Down Expand Up @@ -957,7 +957,7 @@ object DDLUtils extends Logging {
def verifyPartitionProviderIsHive(
spark: SparkSession, table: CatalogTable, action: String): Unit = {
val tableName = table.identifier.table
if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
if (!spark.sessionState.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
throw QueryCompilationErrors
.actionNotAllowedOnTableWithFilesourcePartitionManagementDisabledError(action, tableName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ case class DataSource(

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
val useCatalogFileIndex = sparkSession.sessionState.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ object FileStatusCache {
* shared across all clients.
*/
def getOrCreate(session: SparkSession): FileStatusCache = synchronized {
if (session.sqlContext.conf.manageFilesourcePartitions &&
session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
if (session.sessionState.conf.manageFilesourcePartitions &&
session.sessionState.conf.filesourcePartitionFileCacheSize > 0) {
if (sharedCache == null) {
sharedCache = new SharedInMemoryCache(
session.sqlContext.conf.filesourcePartitionFileCacheSize,
session.sqlContext.conf.metadataCacheTTL
session.sessionState.conf.filesourcePartitionFileCacheSize,
session.sessionState.conf.metadataCacheTTL
)
}
sharedCache.createForNewClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class HadoopFsRelation(
}

override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor
(location.sizeInBytes * compressionFactor).toLong
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ abstract class PartitioningAwareFileIndex(
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths,
userSpecifiedSchema = userSpecifiedSchema,
caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns,
caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis,
validatePartitionColumns = sparkSession.sessionState.conf.validatePartitionColumns,
timeZoneId = timeZoneId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ class JdbcRelationProvider extends CreatableRelationProvider
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val jdbcOptions = new JDBCOptions(parameters)
val resolver = sqlContext.conf.resolver
val timeZoneId = sqlContext.conf.sessionLocalTimeZone
val sparkSession = sqlContext.sparkSession
val resolver = sparkSession.sessionState.conf.resolver
val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
val schema = JDBCRelation.getSchema(resolver, jdbcOptions)
val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession)
JDBCRelation(schema, parts, jdbcOptions)(sparkSession)
}

override def createRelation(
Expand All @@ -45,7 +46,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
val options = new JdbcOptionsInWrite(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
val isCaseSensitive = sqlContext.sparkSession.sessionState.conf.caseSensitiveAnalysis
val dialect = JdbcDialects.get(options.url)
val conn = dialect.createConnectionFactory(options)(-1)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class MicroBatchExecution(
// When the flag is disabled, Spark will fall back to single batch execution, whenever
// it figures out any source does not support Trigger.AvailableNow.
// See SPARK-45178 for more details.
if (sparkSession.sqlContext.conf.getConf(
if (sparkSession.sessionState.conf.getConf(
SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) {
logInfo("Configured to use the wrapper of Trigger.AvailableNow for query " +
s"$prettyIdString.")
Expand Down Expand Up @@ -113,7 +113,7 @@ class MicroBatchExecution(
// transformation is responsible for replacing attributes with their final values.

val disabledSources =
Utils.stringToSeq(sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders)
Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders)

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
Expand Down Expand Up @@ -144,7 +144,7 @@ class MicroBatchExecution(
})
} else if (v1.isEmpty) {
throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError(
srcName, sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders, table)
srcName, sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders, table)
} else {
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ trait ProgressReporter extends Logging {
private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
progressBuffer.synchronized {
progressBuffer += newProgress
while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
while (progressBuffer.length >= sparkSession.sessionState.conf.streamingProgressRetention) {
progressBuffer.dequeue()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private[continuous] class EpochCoordinator(
extends ThreadSafeRpcEndpoint with Logging {

private val epochBacklogQueueSize =
session.sqlContext.conf.continuousStreamingEpochBacklogQueueSize
session.sessionState.conf.continuousStreamingEpochBacklogQueueSize

private var queryWritesStopped: Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ abstract class BaseRelation {
*
* @since 1.3.0
*/
def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes
def sizeInBytes: Long = sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes

/**
* Whether does it need to convert the objects in Row to internal representation, for example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
extraOptions + ("path" -> path.get)
}

val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).
val ds = DataSource.lookupDataSource(source, sparkSession.sessionState.conf).
getConstructor().newInstance()
// We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
// We can't be sure at this point whether we'll actually want to use V2, since we don't know the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
} else {
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
val disabledSources =
Utils.stringToSeq(df.sparkSession.sqlContext.conf.disabledV2StreamingWriters)
Utils.stringToSeq(df.sparkSession.sessionState.conf.disabledV2StreamingWriters)
val useV1Source = disabledSources.contains(cls.getCanonicalName) ||
// file source v2 does not support streaming yet.
classOf[FileDataSourceV2].isAssignableFrom(cls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
List.fill(n)(ROW).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
val file = TestUtils.listDirectory(dir).head

val conf = sqlContext.conf
val conf = spark.sessionState.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
reader.initialize(file, null)
Expand Down Expand Up @@ -91,7 +91,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
data.repartition(1).write.parquet(dir.getCanonicalPath)
val file = TestUtils.listDirectory(dir).head

val conf = sqlContext.conf
val conf = spark.sessionState.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
reader.initialize(file, null)
Expand Down Expand Up @@ -125,7 +125,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
val file = TestUtils.listDirectory(dir).head

val conf = sqlContext.conf
val conf = spark.sessionState.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
reader.initialize(file, null /* set columns to null to project all columns */)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
val file = TestUtils.listDirectory(dir).head;
{
val conf = sqlContext.conf
val conf = spark.sessionState.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
try {
Expand All @@ -1394,7 +1394,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

// Project just one column
{
val conf = sqlContext.conf
val conf = spark.sessionState.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
try {
Expand All @@ -1412,7 +1412,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

// Project columns in opposite order
{
val conf = sqlContext.conf
val conf = spark.sessionState.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
try {
Expand All @@ -1431,7 +1431,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

// Empty projection
{
val conf = sqlContext.conf
val conf = spark.sessionState.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
try {
Expand Down Expand Up @@ -1473,7 +1473,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
val conf = sqlContext.conf
val conf = spark.sessionState.conf
val vectorizedReader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
val partitionValues = new GenericInternalRow(Array(v))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class RatePerMicroBatchProviderSuite extends StreamTest {
import testImplicits._

test("RatePerMicroBatchProvider in registry") {
val ds = DataSource.lookupDataSource("rate-micro-batch", spark.sqlContext.conf)
val ds = DataSource.lookupDataSource("rate-micro-batch", spark.sessionState.conf)
.getConstructor().newInstance()
assert(ds.isInstanceOf[RatePerMicroBatchProvider], "Could not find rate-micro-batch source")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ class RateStreamProviderSuite extends StreamTest {
}

test("RateStreamProvider in registry") {
val ds = DataSource.lookupDataSource("rate", spark.sqlContext.conf)
val ds = DataSource.lookupDataSource("rate", spark.sessionState.conf)
.getConstructor().newInstance()
assert(ds.isInstanceOf[RateStreamProvider], "Could not find rate source")
}

test("compatible with old path in registry") {
val ds = DataSource.lookupDataSource(
"org.apache.spark.sql.execution.streaming.RateSourceProvider",
spark.sqlContext.conf).getConstructor().newInstance()
spark.sessionState.conf).getConstructor().newInstance()
assert(ds.isInstanceOf[RateStreamProvider], "Could not find rate source")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
test("backward compatibility with old path") {
val ds = DataSource.lookupDataSource(
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
spark.sqlContext.conf).getConstructor().newInstance()
spark.sessionState.conf).getConstructor().newInstance()
assert(ds.isInstanceOf[TextSocketSourceProvider], "Could not find socket source")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter
withTempDir { file =>
withSQLConf(SQLConf.STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.key ->
skipNullsForStreamStreamJoins.toString) {
val storeConf = new StateStoreConf(spark.sqlContext.conf)
val storeConf = new StateStoreConf(spark.sessionState.conf)
val stateInfo = StatefulOperatorStateInfo(file.getAbsolutePath, UUID.randomUUID, 0, 0, 5)
val manager = new SymmetricHashJoinStateManager(
LeftSide, inputValueAttribs, joinKeyExprs, Some(stateInfo), storeConf, new Configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
val schema = JdbcUtils.schemaString(
df.schema,
df.sqlContext.conf.caseSensitiveAnalysis,
df.sparkSession.sessionState.conf.caseSensitiveAnalysis,
"jdbc:mysql://localhost:3306/temp")
assert(schema.contains("`order` LONGTEXT"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter {

assert(JdbcUtils.schemaString(
schema,
spark.sqlContext.conf.caseSensitiveAnalysis,
spark.sessionState.conf.caseSensitiveAnalysis,
url1,
Option(createTableColTypes)) == expectedSchemaStr)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
joined.sort("bucketed_table1.k", "bucketed_table2.k"),
df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k"))

val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
val joinOperator = if (joined.sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
assert(executedPlan.isInstanceOf[SortMergeJoinExec])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ class StreamSuite extends StreamTest {
.map(_.asInstanceOf[RepartitionByExpression].numPartitions)
// Before the fix of SPARK-34482, the numPartition is the value of
// `COALESCE_PARTITIONS_INITIAL_PARTITION_NUM`.
assert(numPartition.get === spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS))
assert(numPartition.get === spark.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS))
} finally {
if (query != null) {
query.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
.groupBy("group")
.agg(collect_list("value"))
testStream(df, outputMode = OutputMode.Update)(
AddData(input, (1 to spark.sqlContext.conf.objectAggSortBasedFallbackThreshold): _*),
AddData(input, (1 to spark.sessionState.conf.objectAggSortBasedFallbackThreshold): _*),
AssertOnQuery { q =>
q.processAllAvailable()
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
}
}

val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
val numPartitions = spark.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)

assert(query.lastExecution.executedPlan.collect {
case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _,
Expand Down
Loading

0 comments on commit c28593b

Please sign in to comment.