Skip to content

Commit 1bb0c8e

Browse files
WeichenXu123mengxr
andcommitted
[SPARK-25348][SQL] Data source for binary files
## What changes were proposed in this pull request? Implement binary file data source in Spark. Format name: "binaryFile" (case-insensitive) Schema: - content: BinaryType - status: StructType - path: StringType - modificationTime: TimestampType - length: LongType Options: * pathGlobFilter (instead of pathFilterRegex) to reply on GlobFilter behavior * maxBytesPerPartition is not implemented since it is controlled by two SQL confs: maxPartitionBytes and openCostInBytes. ## How was this patch tested? Unit test added. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #24354 from WeichenXu123/binary_file_datasource. Lead-authored-by: WeichenXu <weichen.xu@databricks.com> Co-authored-by: Xiangrui Meng <meng@databricks.com> Signed-off-by: Xiangrui Meng <meng@databricks.com>
1 parent 26ed65f commit 1bb0c8e

File tree

3 files changed

+321
-0
lines changed

3 files changed

+321
-0
lines changed

sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
88
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
99
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
1010
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
11+
org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.binaryfile
19+
20+
import com.google.common.io.{ByteStreams, Closeables}
21+
import org.apache.hadoop.conf.Configuration
22+
import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path}
23+
import org.apache.hadoop.mapreduce.Job
24+
25+
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.sql.catalyst.InternalRow
27+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
28+
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
29+
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
30+
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
31+
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
32+
import org.apache.spark.sql.types._
33+
import org.apache.spark.unsafe.types.UTF8String
34+
import org.apache.spark.util.SerializableConfiguration
35+
36+
37+
/**
38+
* The binary file data source.
39+
*
40+
* It reads binary files and converts each file into a single record that contains the raw content
41+
* and metadata of the file.
42+
*
43+
* Example:
44+
* {{{
45+
* // Scala
46+
* val df = spark.read.format("binaryFile")
47+
* .option("pathGlobFilter", "*.png")
48+
* .load("/path/to/fileDir")
49+
*
50+
* // Java
51+
* Dataset<Row> df = spark.read().format("binaryFile")
52+
* .option("pathGlobFilter", "*.png")
53+
* .load("/path/to/fileDir");
54+
* }}}
55+
*/
56+
class BinaryFileFormat extends FileFormat with DataSourceRegister {
57+
58+
override def inferSchema(
59+
sparkSession: SparkSession,
60+
options: Map[String, String],
61+
files: Seq[FileStatus]): Option[StructType] = Some(BinaryFileFormat.schema)
62+
63+
override def prepareWrite(
64+
sparkSession: SparkSession,
65+
job: Job,
66+
options: Map[String, String],
67+
dataSchema: StructType): OutputWriterFactory = {
68+
throw new UnsupportedOperationException("Write is not supported for binary file data source")
69+
}
70+
71+
override def isSplitable(
72+
sparkSession: SparkSession,
73+
options: Map[String, String],
74+
path: Path): Boolean = {
75+
false
76+
}
77+
78+
override def shortName(): String = "binaryFile"
79+
80+
override protected def buildReader(
81+
sparkSession: SparkSession,
82+
dataSchema: StructType,
83+
partitionSchema: StructType,
84+
requiredSchema: StructType,
85+
filters: Seq[Filter],
86+
options: Map[String, String],
87+
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
88+
89+
val broadcastedHadoopConf =
90+
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
91+
92+
val binaryFileSourceOptions = new BinaryFileSourceOptions(options)
93+
94+
val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter
95+
96+
(file: PartitionedFile) => {
97+
val path = file.filePath
98+
val fsPath = new Path(path)
99+
100+
// TODO: Improve performance here: each file will recompile the glob pattern here.
101+
val globFilter = pathGlobPattern.map(new GlobFilter(_))
102+
if (!globFilter.isDefined || globFilter.get.accept(fsPath)) {
103+
val fs = fsPath.getFileSystem(broadcastedHadoopConf.value.value)
104+
val fileStatus = fs.getFileStatus(fsPath)
105+
val length = fileStatus.getLen()
106+
val modificationTime = fileStatus.getModificationTime()
107+
val stream = fs.open(fsPath)
108+
109+
val content = try {
110+
ByteStreams.toByteArray(stream)
111+
} finally {
112+
Closeables.close(stream, true)
113+
}
114+
115+
val fullOutput = dataSchema.map { f =>
116+
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()
117+
}
118+
val requiredOutput = fullOutput.filter { a =>
119+
requiredSchema.fieldNames.contains(a.name)
120+
}
121+
122+
// TODO: Add column pruning
123+
// currently it still read the file content even if content column is not required.
124+
val requiredColumns = GenerateUnsafeProjection.generate(requiredOutput, fullOutput)
125+
126+
val internalRow = InternalRow(
127+
content,
128+
InternalRow(
129+
UTF8String.fromString(path),
130+
DateTimeUtils.fromMillis(modificationTime),
131+
length
132+
)
133+
)
134+
135+
Iterator(requiredColumns(internalRow))
136+
} else {
137+
Iterator.empty
138+
}
139+
}
140+
}
141+
}
142+
143+
object BinaryFileFormat {
144+
145+
private val fileStatusSchema = StructType(
146+
StructField("path", StringType, false) ::
147+
StructField("modificationTime", TimestampType, false) ::
148+
StructField("length", LongType, false) :: Nil)
149+
150+
/**
151+
* Schema for the binary file data source.
152+
*
153+
* Schema:
154+
* - content (BinaryType): The content of the file.
155+
* - status (StructType): The status of the file.
156+
* - path (StringType): The path of the file.
157+
* - modificationTime (TimestampType): The modification time of the file.
158+
* In some Hadoop FileSystem implementation, this might be unavailable and fallback to some
159+
* default value.
160+
* - length (LongType): The length of the file in bytes.
161+
*/
162+
val schema = StructType(
163+
StructField("content", BinaryType, true) ::
164+
StructField("status", fileStatusSchema, false) :: Nil)
165+
}
166+
167+
class BinaryFileSourceOptions(
168+
@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {
169+
170+
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
171+
172+
/**
173+
* An optional glob pattern to only include files with paths matching the pattern.
174+
* The syntax follows [[org.apache.hadoop.fs.GlobFilter]].
175+
*/
176+
val pathGlobFilter: Option[String] = parameters.get("pathGlobFilter")
177+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.binaryfile
19+
20+
import java.io.File
21+
import java.nio.file.{Files, StandardOpenOption}
22+
import java.sql.Timestamp
23+
24+
import scala.collection.JavaConverters._
25+
26+
import com.google.common.io.{ByteStreams, Closeables}
27+
import org.apache.hadoop.fs.{FileSystem, GlobFilter, Path}
28+
29+
import org.apache.spark.sql.{QueryTest, Row}
30+
import org.apache.spark.sql.functions.col
31+
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
32+
import org.apache.spark.util.Utils
33+
34+
class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
35+
36+
private var testDir: String = _
37+
38+
private var fsTestDir: Path = _
39+
40+
private var fs: FileSystem = _
41+
42+
override def beforeAll(): Unit = {
43+
super.beforeAll()
44+
45+
testDir = Utils.createTempDir().getAbsolutePath
46+
fsTestDir = new Path(testDir)
47+
fs = fsTestDir.getFileSystem(sparkContext.hadoopConfiguration)
48+
49+
val year2014Dir = new File(testDir, "year=2014")
50+
year2014Dir.mkdir()
51+
val year2015Dir = new File(testDir, "year=2015")
52+
year2015Dir.mkdir()
53+
54+
Files.write(
55+
new File(year2014Dir, "data.txt").toPath,
56+
Seq("2014-test").asJava,
57+
StandardOpenOption.CREATE, StandardOpenOption.WRITE
58+
)
59+
Files.write(
60+
new File(year2014Dir, "data2.bin").toPath,
61+
"2014-test-bin".getBytes,
62+
StandardOpenOption.CREATE, StandardOpenOption.WRITE
63+
)
64+
65+
Files.write(
66+
new File(year2015Dir, "bool.csv").toPath,
67+
Seq("bool", "True", "False", "true").asJava,
68+
StandardOpenOption.CREATE, StandardOpenOption.WRITE
69+
)
70+
Files.write(
71+
new File(year2015Dir, "data.txt").toPath,
72+
"2015-test".getBytes,
73+
StandardOpenOption.CREATE, StandardOpenOption.WRITE
74+
)
75+
}
76+
77+
def testBinaryFileDataSource(pathGlobFilter: String): Unit = {
78+
val resultDF = spark.read.format("binaryFile")
79+
.option("pathGlobFilter", pathGlobFilter)
80+
.load(testDir)
81+
.select(
82+
col("status.path"),
83+
col("status.modificationTime"),
84+
col("status.length"),
85+
col("content"),
86+
col("year") // this is a partition column
87+
)
88+
89+
val expectedRowSet = new collection.mutable.HashSet[Row]()
90+
91+
val globFilter = new GlobFilter(pathGlobFilter)
92+
for (partitionDirStatus <- fs.listStatus(fsTestDir)) {
93+
val dirPath = partitionDirStatus.getPath
94+
95+
val partitionName = dirPath.getName.split("=")(1)
96+
val year = partitionName.toInt // partition column "year" value which is `Int` type
97+
98+
for (fileStatus <- fs.listStatus(dirPath)) {
99+
if (globFilter.accept(fileStatus.getPath)) {
100+
val fpath = fileStatus.getPath.toString.replace("file:/", "file:///")
101+
val flen = fileStatus.getLen
102+
val modificationTime = new Timestamp(fileStatus.getModificationTime)
103+
104+
val fcontent = {
105+
val stream = fs.open(fileStatus.getPath)
106+
val content = try {
107+
ByteStreams.toByteArray(stream)
108+
} finally {
109+
Closeables.close(stream, true)
110+
}
111+
content
112+
}
113+
114+
val row = Row(fpath, modificationTime, flen, fcontent, year)
115+
expectedRowSet.add(row)
116+
}
117+
}
118+
}
119+
120+
checkAnswer(resultDF, expectedRowSet.toSeq)
121+
}
122+
123+
test("binary file data source test") {
124+
testBinaryFileDataSource(pathGlobFilter = "*.*")
125+
testBinaryFileDataSource(pathGlobFilter = "*.bin")
126+
testBinaryFileDataSource(pathGlobFilter = "*.txt")
127+
testBinaryFileDataSource(pathGlobFilter = "*.{txt,csv}")
128+
testBinaryFileDataSource(pathGlobFilter = "*.json")
129+
}
130+
131+
test ("binary file data source do not support write operation") {
132+
val df = spark.read.format("binaryFile").load(testDir)
133+
withTempDir { tmpDir =>
134+
val thrown = intercept[UnsupportedOperationException] {
135+
df.write
136+
.format("binaryFile")
137+
.save(tmpDir + "/test_save")
138+
}
139+
assert(thrown.getMessage.contains("Write is not supported for binary file data source"))
140+
}
141+
}
142+
143+
}

0 commit comments

Comments
 (0)