From a643114e71e8d5054910a60b79ef1d1d392da452 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 26 Jun 2016 21:36:39 -0700 Subject: [PATCH 1/3] [SPARK-16221][SQL] Redirect Parquet JUL logger via SLF4J for WRITE operations Remove a legacy try-catch block and fix some typos. (Bug) --- .../parquet/ParquetFileFormat.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2cce3db9a6925..88612374b8663 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -129,6 +129,8 @@ private[sql] class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } + ParquetFileFormat.redirectParquetLogs + new OutputWriterFactory { override def newInstance( path: String, @@ -468,9 +470,9 @@ private[sql] class ParquetOutputWriterFactory( override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter { // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter - private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) + private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) private val hadoopAttemptContext = new TaskAttemptContextImpl( - serializableConf.value, hadoopTaskAttempId) + serializableConf.value, hadoopTaskAttemptId) // Instance of ParquetRecordWriter that does not use OutputCommitter private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) @@ -505,7 +507,7 @@ private[sql] class ParquetOutputWriterFactory( dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { throw new UnsupportedOperationException( - "this verison of newInstance not supported for " + + "this version of newInstance not supported for " + "ParquetOutputWriterFactory") } } @@ -665,7 +667,7 @@ private[sql] object ParquetFileFormat extends Logging { Some(Try(DataType.fromJson(serializedSchema.get)) .recover { case _: Throwable => logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + "falling back to the deprecated DataType.fromCaseClassString parser.") LegacyTypeStringParser.parse(serializedSchema.get) } @@ -880,7 +882,7 @@ private[sql] object ParquetFileFormat extends Logging { Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover { case _: Throwable => logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + "falling back to the deprecated DataType.fromCaseClassString parser.") LegacyTypeStringParser.parse(schemaString).asInstanceOf[StructType] }.recoverWith { @@ -915,15 +917,14 @@ private[sql] object ParquetFileFormat extends Logging { // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet` // namespace. - try { - // scalastyle:off classforname - Class.forName("parquet.Log") - // scalastyle:on classforname - redirect(JLogger.getLogger("parquet")) - } catch { case _: Throwable => - // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly - // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block - // should be removed after this issue is fixed. - } + // scalastyle:off classforname + Class.forName("parquet.Log") + // scalastyle:on classforname + redirect(JLogger.getLogger("parquet")) } + + /** + * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`. + */ + def redirectParquetLogs(): Unit = {} } From badf828fb1a7d54696f2ab5fb36746af919eae32 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 26 Jun 2016 22:27:58 -0700 Subject: [PATCH 2/3] Revert the change about SPARK-9974. --- .../datasources/parquet/ParquetFileFormat.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 88612374b8663..f7ce292f56b5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -917,10 +917,16 @@ private[sql] object ParquetFileFormat extends Logging { // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet` // namespace. - // scalastyle:off classforname - Class.forName("parquet.Log") - // scalastyle:on classforname - redirect(JLogger.getLogger("parquet")) + try { + // scalastyle:off classforname + Class.forName("parquet.Log") + // scalastyle:on classforname + redirect(JLogger.getLogger("parquet")) + } catch { case _: Throwable => + // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly + // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block + // should be removed after this issue is fixed. + } } /** From fd84576db8fa39c007aaf862f2857f3d5268bfbf Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 27 Jun 2016 09:22:49 -0700 Subject: [PATCH 3/3] Add (). --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f7ce292f56b5a..80002d4204f42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -129,7 +129,7 @@ private[sql] class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } - ParquetFileFormat.redirectParquetLogs + ParquetFileFormat.redirectParquetLogs() new OutputWriterFactory { override def newInstance(