Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class RelationalGroupedDataset protected[sql](
*/
def pivot(pivotColumn: String): RelationalGroupedDataset = {
// This is to prevent unintended OOM errors when the number of distinct values is large
val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues
// Get the distinct values of the column and sort them so its consistent
val values = df.select(pivotColumn)
.distinct()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
}

def assertSupported(): Unit = {
if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForBatch(analyzed)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ case class DataSource(
}
}

val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference
val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ case class InsertIntoHadoopFsRelationCommand(
dataColumns = dataColumns,
inputSchema = query.output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES),
sparkSession.sessionState.conf.partitionMaxFiles,
isAppend)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ abstract class PartitioningAwareFileCatalog(
PartitioningUtils.parsePartitions(
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class ParquetFileFormat
// Should we merge schemas from all Parquet part-files?
val shouldMergeSchemas = parquetOptions.mergeSchema

val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries

val filesByType = splitFiles(files)

Expand Down Expand Up @@ -309,14 +309,14 @@ class ParquetFileFormat
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING))
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)

// Try to push down filters when filter push-down is enabled.
val pushed =
if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
if (sparkSession.sessionState.conf.parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[parquet] class ParquetOptions(
val mergeSchema: Boolean = parameters
.get(MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
.getOrElse(sqlConf.isParquetSchemaMergingEnabled)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
* a live lock may happen if the compaction happens too frequently: one processing keeps deleting
* old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
*/
private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay

private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion

private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class StreamExecution(

import org.apache.spark.sql.streaming.StreamingQueryListener._

private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay

/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex

def this() = this(new SQLConf)

import SQLConf._
val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot

val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)

val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
val minVersionsToRetain = conf.stateStoreMinVersionsToRetain
}

private[streaming] object StateStoreConf {
Expand Down
42 changes: 31 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,6 @@ object SQLConf {
.intConf
.createWithDefault(4000)

val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
.doc("When true, automatically discover data partitions.")
.booleanConf
.createWithDefault(true)

val PARTITION_COLUMN_TYPE_INFERENCE =
Copy link
Member Author

@HyukjinKwon HyukjinKwon Aug 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we always enable this and this option is not referenced anywhere(?). Is this intendedly not used?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Aug 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the usage was removed while refactoring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do me a favor to check which PR removed the usage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like we still keep another SQLConf PARALLEL_PARTITION_DISCOVERY_THRESHOLD. However, the description of PARALLEL_PARTITION_DISCOVERY_THRESHOLD is not right after the refactoring. Could you update the descriptions too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #11509 removes the usage of PARTITION_DISCOVERY_ENABLED. I think it is safe to remove this conf now.

SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
.doc("When true, automatically infer the data types for partitioned columns.")
Expand Down Expand Up @@ -374,8 +369,10 @@ object SQLConf {

val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
.doc("The degree of parallelism for schema merging and partition discovery of " +
"Parquet data sources.")
.doc("The maximum number of files allowed for listing files at driver side. If the number " +
"of detected files exceeds this value during partition discovery, it tries to list the " +
"files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
"LibSVM data sources.")
.intConf
.createWithDefault(32)

Expand Down Expand Up @@ -571,8 +568,24 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)

def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)

def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)

def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)

def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)

def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)

def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)

def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)

def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
Expand Down Expand Up @@ -632,6 +645,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)

def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)

def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES)

def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS)

def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)

def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
Expand All @@ -648,12 +667,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def convertCTAS: Boolean = getConf(CONVERT_CTAS)

def partitionDiscoveryEnabled(): Boolean =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)

def partitionColumnTypeInferenceEnabled(): Boolean =
def partitionColumnTypeInferenceEnabled: Boolean =
getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)

def partitionMaxFiles: Int = getConf(PARTITION_MAX_FILES)

def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)

Expand All @@ -672,6 +690,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)

def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)

override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)

def vectorizedAggregateMapMaxColumns: Int = getConf(VECTORIZED_AGG_MAP_MAX_COLUMNS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, name).toUri.toString
}
}.getOrElse {
Expand Down Expand Up @@ -232,7 +232,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()

if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}

Expand Down