@@ -22,9 +22,6 @@ import java.io.File
2222import scala .util .Random
2323
2424import org .apache .hadoop .fs .Path
25- import org .apache .hadoop .mapreduce .{JobContext , TaskAttemptContext }
26- import org .apache .hadoop .mapreduce .lib .output .FileOutputCommitter
27- import org .apache .parquet .hadoop .ParquetOutputCommitter
2825
2926import org .apache .spark .deploy .SparkHadoopUtil
3027import org .apache .spark .sql ._
@@ -783,52 +780,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
783780 }
784781 }
785782
786- test(" SPARK-8578 specified custom output committer will not be used to append data" ) {
787- withSQLConf(SQLConf .FILE_COMMIT_PROTOCOL_CLASS .key ->
788- classOf [SQLHadoopMapReduceCommitProtocol ].getCanonicalName) {
789- val extraOptions = Map [String , String ](
790- SQLConf .OUTPUT_COMMITTER_CLASS .key -> classOf [AlwaysFailOutputCommitter ].getName,
791- // Since Parquet has its own output committer setting, also set it
792- // to AlwaysFailParquetOutputCommitter at here.
793- " spark.sql.parquet.output.committer.class" ->
794- classOf [AlwaysFailParquetOutputCommitter ].getName
795- )
796-
797- val df = spark.range(1 , 10 ).toDF(" i" )
798- withTempPath { dir =>
799- df.write.mode(" append" ).format(dataSourceName).save(dir.getCanonicalPath)
800- // Because there data already exists,
801- // this append should succeed because we will use the output committer associated
802- // with file format and AlwaysFailOutputCommitter will not be used.
803- df.write.mode(" append" ).format(dataSourceName).save(dir.getCanonicalPath)
804- checkAnswer(
805- spark.read
806- .format(dataSourceName)
807- .option(" dataSchema" , df.schema.json)
808- .options(extraOptions)
809- .load(dir.getCanonicalPath),
810- df.union(df))
811-
812- // This will fail because AlwaysFailOutputCommitter is used when we do append.
813- intercept[Exception ] {
814- df.write.mode(" overwrite" )
815- .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
816- }
817- }
818- withTempPath { dir =>
819- // Because there is no existing data,
820- // this append will fail because AlwaysFailOutputCommitter is used when we do append
821- // and there is no existing data.
822- intercept[Exception ] {
823- df.write.mode(" append" )
824- .options(extraOptions)
825- .format(dataSourceName)
826- .save(dir.getCanonicalPath)
827- }
828- }
829- }
830- }
831-
832783 test(" SPARK-8887: Explicitly define which data types can be used as dynamic partition columns" ) {
833784 val df = Seq (
834785 (1 , " v1" , Array (1 , 2 , 3 ), Map (" k1" -> " v1" ), Tuple2 (1 , " 4" )),
@@ -898,27 +849,3 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
898849 }
899850 }
900851}
901-
902- // This class is used to test SPARK-8578. We should not use any custom output committer when
903- // we actually append data to an existing dir.
904- class AlwaysFailOutputCommitter (
905- outputPath : Path ,
906- context : TaskAttemptContext )
907- extends FileOutputCommitter (outputPath, context) {
908-
909- override def commitJob (context : JobContext ): Unit = {
910- sys.error(" Intentional job commitment failure for testing purpose." )
911- }
912- }
913-
914- // This class is used to test SPARK-8578. We should not use any custom output committer when
915- // we actually append data to an existing dir.
916- class AlwaysFailParquetOutputCommitter (
917- outputPath : Path ,
918- context : TaskAttemptContext )
919- extends ParquetOutputCommitter (outputPath, context) {
920-
921- override def commitJob (context : JobContext ): Unit = {
922- sys.error(" Intentional job commitment failure for testing purpose." )
923- }
924- }
0 commit comments