Skip to content

Commit

Permalink
Update spark and pyspark
Browse files Browse the repository at this point in the history
Signed-off-by: acezen <qiaozi.zwb@alibaba-inc.com>
  • Loading branch information
acezen committed Aug 27, 2024
1 parent 2fd69db commit 01a7117
Show file tree
Hide file tree
Showing 22 changed files with 219 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import org.yaml.snakeyaml.LoaderOptions

/** Edge info is a class to store the edge meta information. */
class EdgeInfo() {
@BeanProperty var src_label: String = ""
@BeanProperty var edge_label: String = ""
@BeanProperty var dst_label: String = ""
@BeanProperty var src_type: String = ""
@BeanProperty var edge_type: String = ""
@BeanProperty var dst_type: String = ""
@BeanProperty var chunk_size: Long = 0
@BeanProperty var src_chunk_size: Long = 0
@BeanProperty var dst_chunk_size: Long = 0
Expand Down Expand Up @@ -277,7 +277,7 @@ class EdgeInfo() {

/** Check if the edge info is validated. */
def isValidated(): Boolean = {
if (src_label == "" || edge_label == "" || dst_label == "") {
if (src_type == "" || edge_type == "" || dst_type == "") {
return false
}
if (chunk_size <= 0 || src_chunk_size <= 0 || dst_chunk_size <= 0) {
Expand Down Expand Up @@ -585,15 +585,15 @@ class EdgeInfo() {
}

def getConcatKey(): String = {
return getSrc_label + GeneralParams.regularSeparator + getEdge_label + GeneralParams.regularSeparator + getDst_label
return getSrc_type + GeneralParams.regularSeparator + getEdge_type + GeneralParams.regularSeparator + getDst_type
}

/** Dump to Yaml string. */
def dump(): String = {
val data = new java.util.HashMap[String, Object]()
data.put("src_label", src_label)
data.put("edge_label", edge_label)
data.put("dst_label", dst_label)
data.put("src_type", src_type)
data.put("edge_type", edge_type)
data.put("dst_type", dst_type)
data.put("chunk_size", new java.lang.Long(chunk_size))
data.put("src_chunk_size", new java.lang.Long(src_chunk_size))
data.put("dst_chunk_size", new java.lang.Long(dst_chunk_size))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,24 +323,24 @@ class GraphInfo() {
var edgeInfos: Map[String, EdgeInfo] = Map[String, EdgeInfo]()

def addVertexInfo(vertexInfo: VertexInfo): Unit = {
vertexInfos += (vertexInfo.getLabel -> vertexInfo)
vertexInfos += (vertexInfo.getType -> vertexInfo)
}

def addEdgeInfo(edgeInfo: EdgeInfo): Unit = {
edgeInfos += (edgeInfo.getConcatKey() -> edgeInfo)
}

def getVertexInfo(label: String): VertexInfo = {
vertexInfos(label)
def getVertexInfo(vertexType: String): VertexInfo = {
vertexInfos(vertexType)
}

def getEdgeInfo(
srcLabel: String,
edgeLabel: String,
dstLabel: String
srcType: String,
edgeType: String,
dstType: String
): EdgeInfo = {
val key =
srcLabel + GeneralParams.regularSeparator + edgeLabel + GeneralParams.regularSeparator + dstLabel
srcType + GeneralParams.regularSeparator + edgeType + GeneralParams.regularSeparator + dstType
edgeInfos(key)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.yaml.snakeyaml.LoaderOptions

/** VertexInfo is a class to store the vertex meta information. */
class VertexInfo() {
@BeanProperty var label: String = ""
@BeanProperty var `type`: String = ""
@BeanProperty var chunk_size: Long = 0
@BeanProperty var prefix: String = ""
@BeanProperty var property_groups = new java.util.ArrayList[PropertyGroup]()
Expand Down Expand Up @@ -200,7 +200,7 @@ class VertexInfo() {
* true if the vertex info is validated, otherwise return false.
*/
def isValidated(): Boolean = {
if (label == "" || chunk_size <= 0) {
if (`type` == "" || chunk_size <= 0) {
return false
}
val len: Int = property_groups.size
Expand Down Expand Up @@ -283,7 +283,7 @@ class VertexInfo() {
/** Dump to Yaml string. */
def dump(): String = {
val data = new java.util.HashMap[String, Object]()
data.put("label", label)
data.put("type", `type`)
data.put("chunk_size", new java.lang.Long(chunk_size))
if (prefix != "") data.put("prefix", prefix)
data.put("version", version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ object GraphAr2Nebula {
edgeData: Map[(String, String, String), Map[String, DataFrame]]
): Unit = {
// write each edge type to Nebula
edgeData.foreach { case (srcEdgeDstLabels, orderMap) =>
val sourceTag = srcEdgeDstLabels._1
val edgeType = srcEdgeDstLabels._2
val targetTag = srcEdgeDstLabels._3
edgeData.foreach { case (srcEdgeDstTypes, orderMap) =>
val sourceTag = srcEdgeDstTypes._1
val edgeType = srcEdgeDstTypes._2
val targetTag = srcEdgeDstTypes._3
val sourcePrimaryKey = graphInfo.getVertexInfo(sourceTag).getPrimaryKey()
val targetPrimaryKey = graphInfo.getVertexInfo(targetTag).getPrimaryKey()
val sourceDF = vertexData(sourceTag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object GraphAr2Neo4j {
.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":" + key)
.option("types", ":" + key)
.option("node.keys", primaryKey)
.save()
}
Expand All @@ -88,15 +88,15 @@ object GraphAr2Neo4j {
// write each edge type to Neo4j
edgeData.foreach {
case (key, value) => {
val sourceLabel = key._1
val edgeLabel = key._2
val targetLabel = key._3
val sourceType = key._1
val edgeType = key._2
val targetType = key._3
val sourcePrimaryKey =
graphInfo.getVertexInfo(sourceLabel).getPrimaryKey()
graphInfo.getVertexInfo(sourceType).getPrimaryKey()
val targetPrimaryKey =
graphInfo.getVertexInfo(targetLabel).getPrimaryKey()
val sourceDf = vertexData(sourceLabel)
val targetDf = vertexData(targetLabel)
graphInfo.getVertexInfo(targetType).getPrimaryKey()
val sourceDf = vertexData(sourceType)
val targetDf = vertexData(targetType)
// convert the source and target index column to the primary key column
val df = Utils.joinEdgesWithVertexPrimaryKey(
value.head._2,
Expand All @@ -107,17 +107,17 @@ object GraphAr2Neo4j {
) // use the first DataFrame of (adj_list_type_str, DataFrame) map

// FIXME: use properties message in edge info
val properties = if (edgeLabel == "REVIEWED") "rating,summary" else ""
val properties = if (edgeType == "REVIEWED") "rating,summary" else ""

df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("relationship", edgeLabel)
.option("relationship", edgeType)
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":" + sourceLabel)
.option("relationship.source.types", ":" + sourceType)
.option("relationship.source.save.mode", "match")
.option("relationship.source.node.keys", "src:" + sourcePrimaryKey)
.option("relationship.target.labels", ":" + targetLabel)
.option("relationship.target.types", ":" + targetType)
.option("relationship.target.save.mode", "match")
.option("relationship.target.node.keys", "dst:" + targetPrimaryKey)
.option("relationship.properties", properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ object LdbcSample2GraphAr {
personInputPath: String,
personKnowsPersonInputPath: String
): Unit = {
// read vertices with label "Person" from given path as a DataFrame
// read vertices with type "Person" from given path as a DataFrame
val person_df = spark.read
.option("delimiter", "|")
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.load(personInputPath)
// put into writer, vertex label is "Person"
// put into writer, vertex type is "Person"
writer.PutVertexData("Person", person_df)

// read edges with type "Person"->"Knows"->"Person" from given path as a DataFrame
Expand All @@ -98,8 +98,8 @@ object LdbcSample2GraphAr {
.option("inferSchema", "true")
.format("csv")
.load(personKnowsPersonInputPath)
// put into writer, source vertex label is "Person", edge label is "Knows"
// target vertex label is "Person"
// put into writer, source vertex type is "Person", edge type is "Knows"
// target vertex type is "Person"
writer.PutEdgeData(("Person", "Knows", "Person"), knows_edge_df)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object Neo4j2GraphAr {
.option("query", "MATCH (n:Person) RETURN n.name AS name, n.born as born")
.option("schema.flatten.limit", 1)
.load()
// put into writer, vertex label is "Person"
// put into writer, vertex type is "Person"
writer.PutVertexData("Person", person_df)

// read vertices with label "Movie" from Neo4j as a DataFrame
Expand All @@ -94,7 +94,7 @@ object Neo4j2GraphAr {
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, vertex label is "Movie"
// put into writer, vertex type is "Movie"
writer.PutVertexData("Movie", movie_df)

// read edges with type "Person"->"PRODUCED"->"Movie" from Neo4j as a DataFrame
Expand All @@ -106,8 +106,8 @@ object Neo4j2GraphAr {
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex label is "Person", edge label is "PRODUCED"
// target vertex label is "Movie"
// put into writer, source vertex type is "Person", edge type is "PRODUCED"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "PRODUCED", "Movie"), produced_edge_df)

// read edges with type "Person"->"ACTED_IN"->"Movie" from Neo4j as a DataFrame
Expand All @@ -119,8 +119,8 @@ object Neo4j2GraphAr {
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex label is "Person", edge label is "ACTED_IN"
// target vertex label is "Movie"
// put into writer, source vertex type is "Person", edge type is "ACTED_IN"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "ACTED_IN", "Movie"), acted_in_edge_df)

// read edges with type "Person"->"DIRECTED"->"Movie" from Neo4j as a DataFrame
Expand All @@ -132,8 +132,8 @@ object Neo4j2GraphAr {
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex label is "Person", edge label is "DIRECTED"
// target vertex label is "Movie"
// put into writer, source vertex type is "Person", edge type is "DIRECTED"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "DIRECTED", "Movie"), directed_edge_df)

// read edges with type "Person"->"FOLLOWS"->"Person" from Neo4j as a DataFrame
Expand All @@ -145,8 +145,8 @@ object Neo4j2GraphAr {
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex label is "Person", edge label is "FOLLOWS"
// target vertex label is "Person"
// put into writer, source vertex type is "Person", edge type is "FOLLOWS"
// target vertex type is "Person"
writer.PutEdgeData(("Person", "FOLLOWS", "Person"), follows_edge_df)

// read edges with type "Person"->"REVIEWED"->"Movie" from Neo4j as a DataFrame
Expand All @@ -158,8 +158,8 @@ object Neo4j2GraphAr {
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex label is "Person", edge label is "REVIEWED"
// target vertex label is "Movie"
// put into writer, source vertex type is "Person", edge type is "REVIEWED"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "REVIEWED", "Movie"), reviewed_edge_df)

// read edges with type "Person"->"WROTE"->"Movie" from Neo4j as a DataFrame
Expand All @@ -171,8 +171,8 @@ object Neo4j2GraphAr {
)
.option("schema.flatten.limit", 1)
.load()
// put into writer, source vertex label is "Person", edge label is "WROTE"
// target vertex label is "Movie"
// put into writer, source vertex type is "Person", edge type is "WROTE"
// target vertex type is "Movie"
writer.PutEdgeData(("Person", "WROTE", "Movie"), wrote_edge_df)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ object GraphReader {
* @param prefix
* The absolute prefix.
* @param vertexInfos
* The map of (vertex label -> VertexInfo) for the graph.
* The map of (vertex type -> VertexInfo) for the graph.
* @param spark
* The Spark session for the reading.
* @return
* The map of (vertex label -> DataFrame)
* The map of (vertex type -> DataFrame)
*/
private def readAllVertices(
prefix: String,
vertexInfos: Map[String, VertexInfo],
spark: SparkSession
): Map[String, DataFrame] = {
val vertex_dataframes: Map[String, DataFrame] = vertexInfos.map {
case (label, vertexInfo) => {
case (vertex_type, vertexInfo) => {
val reader = new VertexReader(prefix, vertexInfo, spark)
(label, reader.readAllVertexPropertyGroups())
(vertex_type, reader.readAllVertexPropertyGroups())
}
}
return vertex_dataframes
Expand All @@ -61,11 +61,11 @@ object GraphReader {
* @param prefix
* The absolute prefix.
* @param edgeInfos
* The map of ((srcLabel, edgeLabel, dstLabel) -> EdgeInfo) for the graph.
* The map of ((srcType, edgeType, dstType) -> EdgeInfo) for the graph.
* @param spark
* The Spark session for the reading.
* @return
* The map of ((srcLabel, edgeLabel, dstLabel) -> (adj_list_type_str ->
* The map of ((srcType, edgeType, dstType) -> (adj_list_type_str ->
* DataFrame))
*/
private def readAllEdges(
Expand All @@ -91,9 +91,9 @@ object GraphReader {
}
(
(
edgeInfo.getSrc_label(),
edgeInfo.getEdge_label(),
edgeInfo.getDst_label()
edgeInfo.getSrc_type(),
edgeInfo.getEdge_type(),
edgeInfo.getDst_type()
),
adj_list_type_edge_df_map
)
Expand All @@ -111,8 +111,8 @@ object GraphReader {
* The Spark session for the loading.
* @return
* Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are
* stored as the map of (vertex_label -> DataFrame) the edge DataFrames are
* stored as a map of ((srcLabel, edgeLabel, dstLabel) -> (adj_list_type_str
* stored as the map of (vertex_type -> DataFrame) the edge DataFrames are
* stored as a map of ((srcType, edgeType, dstType) -> (adj_list_type_str
* -> DataFrame))
*/
def readWithGraphInfo(
Expand Down Expand Up @@ -144,8 +144,8 @@ object GraphReader {
* The Spark session for the loading.
* @return
* Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are
* stored as the map of (vertex_label -> DataFrame) the edge DataFrames are
* stored as a map of (srcLabel_edgeLabel_dstLabel -> (adj_list_type_str ->
* stored as the map of (vertex_type -> DataFrame) the edge DataFrames are
* stored as a map of (srcType_edgeType_dstType -> (adj_list_type_str ->
* DataFrame))
*/
def read(
Expand Down
Loading

0 comments on commit 01a7117

Please sign in to comment.