forked from apache/incubator-graphar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFileSystem.scala
57 lines (53 loc) · 2.34 KB
/
FileSystem.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.graphar.utils
import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs
/** Helper object to write dataframe to chunk files */
object FileSystem {
private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String, startChunkIndex: Int): Unit = {
val sc = spark.sparkContext
val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration)
val path_pattern = new fs.Path(filePrefix + "part*")
val files = file_system.globStatus(path_pattern)
for (i <- 0 until files.length) {
val file_name = files(i).getPath.getName
val new_file_name = "chunk" + (i + startChunkIndex).toString
val destPath = new fs.Path(filePrefix + new_file_name)
if (file_system.isFile(destPath)) {
// if chunk file already exists, overwrite it
file_system.delete(destPath)
}
file_system.rename(new fs.Path(filePrefix + file_name), destPath)
}
}
/** Write input dataframe to output path with certain file format.
*
* @param dataframe DataFrame to write out.
* @param fileType output file format type, the value could be csv|parquet|orc.
* @param outputPrefix output path prefix.
* @param startChunkIndex the start index of chunk.
*
*/
def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String, startChunkIndex: Int = 0): Unit = {
val spark = dataFrame.sparkSession
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
dataFrame.write.mode("append").format(fileType).save(outputPrefix)
renameSparkGeneratedFiles(spark, outputPrefix, startChunkIndex)
}
}