Skip to content

Commit 75ac402

Browse files
authored
[BugFix][Spark] Fix offset chunk output path and offset value of spark writer (#63)
1 parent 0991064 commit 75ac402

File tree

3 files changed

+56
-22
lines changed

3 files changed

+56
-22
lines changed

spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala

+14-8
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,20 @@ import org.apache.hadoop.fs
2222

2323
/** Helper object to write dataframe to chunk files */
2424
object FileSystem {
25-
private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = {
25+
private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String, startChunkIndex: Int): Unit = {
2626
val sc = spark.sparkContext
2727
val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration)
2828
val path_pattern = new fs.Path(filePrefix + "part*")
2929
val files = file_system.globStatus(path_pattern)
3030
for (i <- 0 until files.length) {
3131
val file_name = files(i).getPath.getName
32-
val new_file_name = "chunk" + i.toString
33-
file_system.rename(new fs.Path(filePrefix + file_name), new fs.Path(filePrefix + new_file_name))
32+
val new_file_name = "chunk" + (i + startChunkIndex).toString
33+
val destPath = new fs.Path(filePrefix + new_file_name)
34+
if (file_system.isFile(destPath)) {
35+
// if chunk file already exists, overwrite it
36+
file_system.delete(destPath)
37+
}
38+
file_system.rename(new fs.Path(filePrefix + file_name), destPath)
3439
}
3540
}
3641

@@ -39,13 +44,14 @@ object FileSystem {
3944
* @param dataframe DataFrame to write out.
4045
* @param fileType output file format type, the value could be csv|parquet|orc.
4146
* @param outputPrefix output path prefix.
47+
* @param startChunkIndex the start index of chunk.
48+
*
4249
*/
43-
def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = {
50+
def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String, startChunkIndex: Int = 0): Unit = {
4451
val spark = dataFrame.sparkSession
4552
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
4653
spark.conf.set("parquet.enable.summary-metadata", "false")
47-
// spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
48-
dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix)
49-
renameSparkGeneratedFiles(spark, outputPrefix)
54+
dataFrame.write.mode("append").format(fileType).save(outputPrefix)
55+
renameSparkGeneratedFiles(spark, outputPrefix, startChunkIndex)
5056
}
51-
}
57+
}

spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala

+28-11
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, Prop
2121
import org.apache.spark.sql.SparkSession
2222
import org.apache.spark.sql.Row
2323
import org.apache.spark.sql.DataFrame
24-
import org.apache.spark.sql.types.{LongType, StructField}
24+
import org.apache.spark.sql.types.{IntegerType, LongType, StructType, StructField}
2525
import org.apache.spark.util.Utils
2626
import org.apache.spark.rdd.RDD
2727
import org.apache.spark.sql.functions._
@@ -148,19 +148,36 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
148148
}
149149
}
150150

151-
// generate the Offset chunks files from edge dataframe for this edge type
151+
// generate the offset chunks files from edge dataframe for this edge type
152152
private def writeOffset(): Unit = {
153+
val spark = edgeDf.sparkSession
153154
val file_type = edgeInfo.getAdjListFileType(adjListType)
154-
var chunk_index: Long = 0
155+
var chunk_index: Int = 0
156+
val offset_schema = StructType(Seq(StructField(GeneralParams.offsetCol, LongType)))
157+
val vertex_chunk_size = if (adjListType == AdjListType.ordered_by_source) edgeInfo.getSrc_chunk_size() else edgeInfo.getDst_chunk_size()
158+
val index_column = if (adjListType == AdjListType.ordered_by_source) GeneralParams.srcIndexCol else GeneralParams.dstIndexCol
159+
val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType)
155160
for (chunk <- chunks) {
156-
val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) + "part" + chunk_index.toString + "/"
157-
if (adjListType == AdjListType.ordered_by_source) {
158-
val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol).select("count")
159-
FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
160-
} else {
161-
val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol).select("count")
162-
FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
163-
}
161+
val edge_count_df = chunk.select(index_column).groupBy(index_column).count()
162+
// init a edge count dataframe of vertex range [begin, end] to include isloated vertex
163+
val begin_index: Long = chunk_index * vertex_chunk_size;
164+
val end_index: Long = (chunk_index + 1) * vertex_chunk_size
165+
val init_count_rdd = spark.sparkContext.parallelize(begin_index to end_index).map(key => Row(key, 0L))
166+
val init_count_df = spark.createDataFrame(init_count_rdd, edge_count_df.schema)
167+
// union edge count dataframe and initialized count dataframe
168+
val union_count_chunk = edge_count_df.unionByName(init_count_df).groupBy(index_column).agg(sum("count")).coalesce(1).orderBy(index_column).select("sum(count)")
169+
// calculate offset rdd from count chunk
170+
val offset_rdd = union_count_chunk.rdd.mapPartitionsWithIndex((i, ps) => {
171+
var sum = 0L
172+
var pre_sum = 0L
173+
for (row <- ps ) yield {
174+
pre_sum = sum
175+
sum = sum + row.getLong(0)
176+
Row(pre_sum)
177+
}
178+
})
179+
val offset_df = spark.createDataFrame(offset_rdd, offset_schema)
180+
FileSystem.writeDataFrame(offset_df, FileType.FileTypeToString(file_type), output_prefix, chunk_index)
164181
chunk_index = chunk_index + 1
165182
}
166183
}

spark/src/test/scala/com/alibaba/graphar/TestWriter.scala

+14-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import org.scalatest.funsuite.AnyFunSuite
2323
import org.yaml.snakeyaml.Yaml
2424
import org.yaml.snakeyaml.constructor.Constructor
2525
import org.apache.hadoop.fs.{Path, FileSystem}
26+
import scala.io.Source.fromFile
27+
2628

2729
class WriterSuite extends AnyFunSuite {
2830
val spark = SparkSession.builder()
@@ -64,7 +66,8 @@ class WriterSuite extends AnyFunSuite {
6466
val invalid_property_group= new PropertyGroup()
6567
assertThrows[IllegalArgumentException](writer.writeVertexProperties(invalid_property_group))
6668

67-
// close FileSystem instance
69+
// clean generated files and close FileSystem instance
70+
fs.delete(new Path(prefix + "vertex"))
6871
fs.close()
6972
}
7073

@@ -115,7 +118,8 @@ class WriterSuite extends AnyFunSuite {
115118
// throw exception if pass the adj list type not contain in edge info
116119
assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.unordered_by_dest, edge_df_with_index))
117120

118-
// close FileSystem instance
121+
// clean generated files and close FileSystem instance
122+
fs.delete(new Path(prefix + "edge"))
119123
fs.close()
120124
}
121125

@@ -161,6 +165,12 @@ class WriterSuite extends AnyFunSuite {
161165
val offset_path_pattern = new Path(prefix + edge_info.getAdjListOffsetDirPath(adj_list_type) + "*")
162166
val offset_chunk_files = fs.globStatus(offset_path_pattern)
163167
assert(offset_chunk_files.length == 10)
168+
// compare with correct offset chunk value
169+
val offset_file_path = prefix + edge_info.getAdjListOffsetFilePath(0, adj_list_type)
170+
val correct_offset_file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/edge/person_knows_person/ordered_by_source/offset/chunk0").getPath
171+
val generated_offset_array = fromFile(offset_file_path).getLines.toArray
172+
val expected_offset_array = fromFile(correct_offset_file_path).getLines.toArray
173+
assert(generated_offset_array.sameElements(expected_offset_array))
164174

165175
// test write property group
166176
val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type)
@@ -171,7 +181,8 @@ class WriterSuite extends AnyFunSuite {
171181

172182
writer.writeEdges()
173183

174-
// close FileSystem instance
184+
// clean generated files and close FileSystem instance
185+
fs.delete(new Path(prefix + "edge"))
175186
fs.close()
176187
}
177188
}

0 commit comments

Comments
 (0)