Skip to content

Commit

Permalink
feat(spark): Add an example to generate ldbc sample data from csv to …
Browse files Browse the repository at this point in the history
…graphar

Signed-off-by: acezen <qiaozi.zwb@alibaba-inc.com>
  • Loading branch information
acezen committed May 21, 2024
1 parent 0136d16 commit de14a9a
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 0 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,14 @@ jobs:
# run the importer
cd import
./neo4j.sh neo4j.json
- name: Run LdbcSample2GraphAr example
working-directory: maven-projects/spark
run: |
export JAVA_HOME=${JAVA_HOME_11_X64}
export SPARK_HOME="${HOME}/${{ matrix.spark-hadoop }}"
export PATH="${SPARK_HOME}/bin":"${PATH}"
scripts/build.sh ${{ matrix.mvn-profile }}
scripts/run-ldbc-sample2graphar.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.graphar.example

import org.apache.graphar.graph.GraphWriter

import org.apache.spark.sql.SparkSession

object LdbcSample2GraphAr {

def main(args: Array[String]): Unit = {
// connect to the Neo4j instance
val spark = SparkSession
.builder()
.appName("LdbcSample Data to GraphAr")
.config("spark.master", "local")
.getOrCreate()

// initialize a graph writer
val writer: GraphWriter = new GraphWriter()

// person vertex csv input path
val personInputPath: String = args(0)
// person_knows_person edge csv input path
val personKnowsPersonInputPath: String = args(1)
// output directory
val outputPath: String = args(2)
// vertex chunk size
val vertexChunkSize: Long = args(3).toLong
// edge chunk size
val edgeChunkSize: Long = args(4).toLong
// file type
val fileType: String = args(5)

// put Ldbc sample graph data into writer
readAndPutDataIntoWriter(writer, spark, personInputPath, personKnowsPersonInputPath)

// write in GraphAr format
writer.write(
outputPath,
spark,
"LdbcSample",
vertexChunkSize,
edgeChunkSize,
fileType
)
}

// read data from Neo4j and put into writer
def readAndPutDataIntoWriter(
writer: GraphWriter,
spark: SparkSession,
personInputPath: String,
personKnowsPersonInputPath: String
): Unit = {
// read vertices with label "Person" from given path as a DataFrame
val person_df = spark.read
.option("delimiter", "|")
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.load(personInputPath)
// put into writer, vertex label is "Person"
writer.PutVertexData("Person", person_df)

// read edges with type "Person"->"Knows"->"Person" from given path as a DataFrame
val produced_edge_df = spark.read
.option("delimiter", "|")
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.load(personKnowsPersonInputPath)
// put into writer, source vertex label is "Person", edge label is "Knows"
// target vertex label is "Person"
writer.PutEdgeData(("Person", "Knows", "Person"), produced_edge_df)
}
}
33 changes: 33 additions & 0 deletions maven-projects/spark/scripts/run-ldbc2graphar.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/bash

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


set -eu

cur_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
jar_file="${cur_dir}/../graphar/target/graphar-commons-0.1.0-SNAPSHOT-shaded.jar"
person_input_file="${cur_dir}/../../../testing/ldbc_sample/person_0_0.csv"
person_knows_person_input_file="${cur_dir}/../../../testing/ldbc_sample/person_knows_person_0_0.csv"
output_dir="/tmp/graphar/ldbc_sample"

vertex_chunk_size=100
edge_chunk_size=1024
file_type="parquet"
spark-submit --class org.apache.graphar.example.LdbcSample2GraphAr ${jar_file} \
${person_input_file} ${person_knows_person_input_file} ${output_dir} ${vertex_chunk_size} ${edge_chunk_size} ${file_type}

0 comments on commit de14a9a

Please sign in to comment.