Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.net.URI
import java.util.logging.{Level, Logger => JLogger}
import java.util.logging.{Logger => JLogger}
import java.util.{List => JList}

import scala.collection.JavaConversions._
Expand All @@ -31,22 +31,22 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _}
import org.apache.parquet.schema.MessageType
import org.apache.parquet.{Log => ParquetLog}
import org.apache.parquet.{Log => ApacheParquetLog}
import org.slf4j.bridge.SLF4JBridgeHandler

import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}


private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
Expand Down Expand Up @@ -759,38 +759,39 @@ private[sql] object ParquetRelation extends Logging {
}.toOption
}

def enableLogForwarding() {
// Note: the org.apache.parquet.Log class has a static initializer that
// sets the java.util.logging Logger for "org.apache.parquet". This
// checks first to see if there's any handlers already set
// and if not it creates them. If this method executes prior
// to that class being loaded then:
// 1) there's no handlers installed so there's none to
// remove. But when it IS finally loaded the desired affect
// of removing them is circumvented.
// 2) The parquet.Log static initializer calls setUseParentHandlers(false)
// undoing the attempt to override the logging here.
//
// Therefore we need to force the class to be loaded.
// This should really be resolved by Parquet.
Utils.classForName(classOf[ParquetLog].getName)

// Note: Logger.getLogger("parquet") has a default logger
// that appends to Console which needs to be cleared.
val parquetLogger = JLogger.getLogger(classOf[ParquetLog].getPackage.getName)
parquetLogger.getHandlers.foreach(parquetLogger.removeHandler)
parquetLogger.setUseParentHandlers(true)

// Disables a WARN log message in ParquetOutputCommitter. We first ensure that
// ParquetOutputCommitter is loaded and the static LOG field gets initialized.
// See https://issues.apache.org/jira/browse/SPARK-5968 for details
Utils.classForName(classOf[ParquetOutputCommitter].getName)
JLogger.getLogger(classOf[ParquetOutputCommitter].getName).setLevel(Level.OFF)

// Similar as above, disables a unnecessary WARN log message in ParquetRecordReader.
// See https://issues.apache.org/jira/browse/PARQUET-220 for details
Utils.classForName(classOf[ParquetRecordReader[_]].getName)
JLogger.getLogger(classOf[ParquetRecordReader[_]].getName).setLevel(Level.OFF)
// JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
// However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
// references to loggers in both parquet-mr <= 1.6 and >= 1.7
val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
val parquetLogger: JLogger = JLogger.getLogger("parquet")

// Parquet initializes its own JUL logger in a static block which always prints to stdout. Here
// we redirect the JUL logger via SLF4J JUL bridge handler.
val redirectParquetLogsViaSLF4J: Unit = {
def redirect(logger: JLogger): Unit = {
logger.getHandlers.foreach(logger.removeHandler)
logger.setUseParentHandlers(false)
logger.addHandler(new SLF4JBridgeHandler)
}

// For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
// scalastyle:off classforname
Class.forName(classOf[ApacheParquetLog].getName)
// scalastyle:on classforname
redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))

// For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
// namespace.
try {
// scalastyle:off classforname
Class.forName("parquet.Log")
// scalastyle:on classforname
redirect(JLogger.getLogger("parquet"))
} catch { case _: Throwable =>
// SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly jar
// when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
// should be removed after this issue is fixed.
}
}

// The parquet compression short names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
}

log.debug(s"write support initialized for requested schema $attributes")
ParquetRelation.enableLogForwarding()
new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ private[parquet] object ParquetTypesConverter extends Logging {
extraMetadata,
"Spark")

ParquetRelation.enableLogForwarding()
ParquetFileWriter.writeMetadataFile(
conf,
path,
Expand Down Expand Up @@ -140,8 +139,6 @@ private[parquet] object ParquetTypesConverter extends Logging {
(name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
}

ParquetRelation.enableLogForwarding()

// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a
// single row group to read schema related metadata. Notice that we are making assumptions that
Expand Down
7 changes: 6 additions & 1 deletion sql/hive/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ log4j.logger.hive.log=OFF
log4j.additivity.parquet.hadoop.ParquetRecordReader=false
log4j.logger.parquet.hadoop.ParquetRecordReader=OFF

log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false
log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF

log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false
log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF

log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF

log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR