Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions spark-cypher/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ dependencies {

compile group: 'org.neo4j.driver', name: 'neo4j-java-driver', version: ver.neo4j.driver

compile group: 'org.apache.spark', name: "spark-graph-api".scala(), version: "3.0.0-SNAPSHOT"
compile group: 'org.apache.spark', name: "spark-cypher".scala(), version: "3.0.0-SNAPSHOT"

compileOnly group: 'org.apache.spark', name: "spark-core".scala(), version: ver.spark
compileOnly group: 'org.apache.spark', name: "spark-sql".scala(), version: ver.spark
compileOnly group: 'org.apache.spark', name: "spark-catalyst".scala(), version: ver.spark
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"tableStorageFormat" : "csv",
"tags" : [
0
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
2001,Alice
2002,Bob
2003,Eve
2004,Carol
2005,Carl
2006,Dave
2007,Mallory
2008,Trudy
2009,Trent
2010,Oscar
2011,Victor
2012,Peggy
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
1001,Book,246,1984
1002,Book,842,Cryptonomicon
1003,Book,950,The Eye of the World
1004,Book,478,The Circle
1005,DVD,102,Terminator 2
1006,DVD,820,Die Hard 3
1007,DVD,152,Matrix
1008,DVD,927,Iron Man
1009,Video,832,Jurassic Park
1010,Video,112,Das Boot
1011,Video,862,Sharknado
1012,Video,347,Turtles
1013,Music,886,Shakira
1014,Music,454,Roland Kaiser
1015,Music,743,Snap
1016,Music,623,Dr.Alban
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"version": "1.0",
"labelPropertyMap": [
{
"labels": [
"Product"
],
"properties": {
"title": "STRING",
"rank": "INTEGER",
"category": "STRING"
}
},
{
"labels": [
"Customer"
],
"properties": {
"name": "STRING"
}
}
],
"relTypePropertyMap": [
{
"relType": "BOUGHT",
"properties": {
"rating": "INTEGER",
"helpful": "INTEGER",
"votes": "INTEGER"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
10001,2001,1001,7,4,10
10002,2001,1005,8,5,10
10003,2004,1009,7,4,10
10004,2004,1013,8,5,10
10005,2007,1002,7,4,10
10006,2007,1006,8,5,10
10007,2010,1003,7,4,10
10008,2010,1007,5,5,10
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opencypher.okapi.impl.exception.UnsupportedOperationException
import org.opencypher.okapi.relational.api.graph.RelationalCypherSession
import org.opencypher.okapi.relational.api.planning.RelationalCypherResult
import org.opencypher.spark.api.io._
import org.opencypher.spark.impl.sparkgraph.SparkGraphSupport
import org.opencypher.spark.impl.graph.CAPSGraphFactory
import org.opencypher.spark.impl.table.SparkTable.DataFrameTable
import org.opencypher.spark.impl.{CAPSRecords, CAPSRecordsFactory}
Expand All @@ -51,7 +52,7 @@ import scala.reflect.runtime.universe._
*
* @param sparkSession The Spark session representing the cluster to execute on
*/
sealed class CAPSSession(val sparkSession: SparkSession) extends RelationalCypherSession[DataFrameTable] with Serializable {
sealed class CAPSSession(val sparkSession: SparkSession) extends RelationalCypherSession[DataFrameTable] with SparkGraphSupport with Serializable {

override type Result = RelationalCypherResult[DataFrameTable]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opencypher.spark.impl.sparkgraph

import org.apache.spark.graph.api.NodeFrame
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.opencypher.okapi.api.graph.Namespace
import org.opencypher.spark.api.{CAPSSession, GraphSources}

object GraphApp extends App {
implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("error")

// implicit val cypherSession = SparkCypherSession.create
implicit val cypherSession = CAPSSession.create

// SPIP API
val nodeData: DataFrame = spark.createDataFrame(Seq(0 -> "Alice", 1 -> "Bob")).toDF("id", "name")
val nodeFrame: NodeFrame = NodeFrame(initialDf = nodeData, idColumn = "id", labelSet = Set("Person"))

val graph = cypherSession.createGraph(Seq(nodeFrame))
val result = graph.cypher("MATCH (n) RETURN n")
result.df.show()

// Okapi API
cypherSession.registerSource(Namespace("fs"), GraphSources.fs(getClass.getResource("/csv").getFile).csv)
cypherSession.cypher(s"FROM GRAPH fs.products MATCH (n) RETURN n").show
cypherSession.cypher(
s"""
|FROM GRAPH fs.products
|MATCH (n)
|CONSTRUCT
| CREATE (n)
|RETURN GRAPH""".stripMargin).graph.nodes("n").show
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.opencypher.spark.impl.sparkgraph

import org.apache.spark.graph.api.{CypherResult, CypherSession, NodeFrame, PropertyGraph, RelationshipFrame}
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.opencypher.okapi.api.value.CypherValue.CypherMap
import org.opencypher.okapi.impl.exception.IllegalArgumentException
import org.opencypher.okapi.relational.api.graph.RelationalCypherGraph
import org.opencypher.okapi.relational.api.table.RelationalCypherRecords
import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.impl.sparkgraph.adapters.RelationalGraphAdapter
import org.opencypher.spark.impl.table.SparkTable.DataFrameTable

trait SparkGraphSupport extends CypherSession {

this: CAPSSession =>

override def cypher(
graph: PropertyGraph,
query: String
): CypherResult = cypher(graph, query, Map.empty)

override def cypher(
graph: PropertyGraph,
query: String,
parameters: Map[String, Any]
): CypherResult = {
val relationalGraph = toRelationalGraph(graph)
SparkCypherResult(relationalGraph.cypher(query, CypherMap(parameters.toSeq: _*)).records)

}

override def createGraph(
nodes: Seq[NodeFrame],
relationships: Seq[RelationshipFrame]
): PropertyGraph = {
require(nodes.groupBy(_.labelSet).forall(_._2.size == 1),
"There can be at most one NodeFrame per label set")
require(relationships.groupBy(_.relationshipType).forall(_._2.size == 1),
"There can be at most one RelationshipFrame per relationship type")
RelationalGraphAdapter(this, nodes, relationships)
}

override def load(path: String): PropertyGraph = ???

override def save(
graph: PropertyGraph,
path: String,
saveMode: SaveMode
): Unit = ???

private def toRelationalGraph(graph: PropertyGraph): RelationalCypherGraph[DataFrameTable] = {
graph match {
case adapter: RelationalGraphAdapter => adapter.graph
case other => throw IllegalArgumentException(
expected = "A graph that has been created by `SparkCypherSession.createGraph`",
actual = other.getClass.getSimpleName
)
}
}
}

case class SparkCypherResult(relationalTable: RelationalCypherRecords[DataFrameTable]) extends org.apache.spark.graph.api.CypherResult {
override val df: DataFrame = relationalTable.table.df
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opencypher.spark.impl.sparkgraph.adapters

import org.apache.spark.graph.api.{NodeFrame, RelationshipFrame}
import org.opencypher.okapi.api.io.conversion.{EntityMapping, NodeMappingBuilder, RelationshipMappingBuilder}

object MappingAdapter {

implicit class RichNodeDataFrame(val nodeDf: NodeFrame) extends AnyVal {
def toNodeMapping: EntityMapping = NodeMappingBuilder
.on(nodeDf.idColumn)
.withImpliedLabels(nodeDf.labelSet.toSeq: _*)
.withPropertyKeyMappings(nodeDf.properties.toSeq:_*)
.build
}

implicit class RichRelationshipDataFrame(val relDf: RelationshipFrame) extends AnyVal {
def toRelationshipMapping: EntityMapping = RelationshipMappingBuilder
.on(relDf.idColumn)
.withSourceStartNodeKey(relDf.sourceIdColumn)
.withSourceEndNodeKey(relDf.targetIdColumn)
.withRelType(relDf.relationshipType)
.withPropertyKeyMappings(relDf.properties.toSeq: _*)
.build
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.opencypher.spark.impl.sparkgraph.adapters

import org.apache.spark.graph.api.{NodeFrame, PropertyGraph, PropertyGraphType, RelationshipFrame}
import org.apache.spark.sql.DataFrame
import org.opencypher.okapi.api.types.{CTNode, CTRelationship}
import org.opencypher.okapi.ir.api.expr.Var
import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.CAPSEntityTable
import org.opencypher.spark.impl.sparkgraph.adapters.MappingAdapter._

case class RelationalGraphAdapter(
cypherSession: CAPSSession,
nodeFrames: Seq[NodeFrame],
relationshipFrames: Seq[RelationshipFrame]) extends PropertyGraph {

override def schema: PropertyGraphType = SchemaAdapter(graph.schema)

private[spark] lazy val graph = {
if (nodeFrames.isEmpty) {
cypherSession.graphs.empty
} else {
val nodeTables = nodeFrames.map { nodeDataFrame => CAPSEntityTable(nodeDataFrame.toNodeMapping, nodeDataFrame.df) }
val relTables = relationshipFrames.map { relDataFrame => CAPSEntityTable(relDataFrame.toRelationshipMapping, relDataFrame.df) }
cypherSession.graphs.create(nodeTables.head, nodeTables.tail ++ relTables: _*)
}
}

private lazy val _nodeFrame: Map[Set[String], NodeFrame] = nodeFrames.map(nf => nf.labelSet -> nf).toMap

private lazy val _relationshipFrame: Map[String, RelationshipFrame] = relationshipFrames.map(rf => rf.relationshipType -> rf).toMap

override def nodes: DataFrame = {
// TODO: move to API as default implementation
val nodeVar = Var("n")(CTNode)
val nodes = graph.nodes(nodeVar.name)

val df = nodes.table.df
val header = nodes.header

val idRename = header.column(nodeVar) -> "$ID"
val labelRenames = header.labelsFor(nodeVar).map(hasLabel => header.column(hasLabel) -> s":${hasLabel.label.name}").toSeq.sortBy(_._2)
val propertyRenames = header.propertiesFor(nodeVar).map(property => header.column(property) -> property.key.name).toSeq.sortBy(_._2)

val selectColumns = (Seq(idRename) ++ labelRenames ++ propertyRenames).map { case (oldColumn, newColumn) => df.col(oldColumn).as(newColumn) }

df.select(selectColumns: _*)
}

override def relationships: DataFrame = {
// TODO: move to API as default implementation
val relVar = Var("r")(CTRelationship)
val rels = graph.relationships(relVar.name)

val df = rels.table.df
val header = rels.header

val idRename = header.column(relVar) -> "$ID"
val sourceIdRename = header.column(header.startNodeFor(relVar)) -> "$SOURCE_ID"
val targetIdRename = header.column(header.endNodeFor(relVar)) -> "$TARGET_ID"
val relTypeRenames = header.typesFor(relVar).map(hasType => header.column(hasType) -> s":${hasType.relType.name}").toSeq.sortBy(_._2)
val propertyRenames = header.propertiesFor(relVar).map(property => header.column(property) -> property.key.name).toSeq.sortBy(_._2)

val selectColumns = (Seq(idRename, sourceIdRename, targetIdRename) ++ relTypeRenames ++ propertyRenames).map { case (oldColumn, newColumn) => df.col(oldColumn).as(newColumn) }

df.select(selectColumns: _*)
}

override def nodeFrame(labelSet: Set[String]): NodeFrame = _nodeFrame(labelSet)

override def relationshipFrame(relationshipType: String): RelationshipFrame = _relationshipFrame(relationshipType)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opencypher.spark.impl.sparkgraph.adapters

import org.apache.spark.graph.api.PropertyGraphType
import org.opencypher.okapi.api.schema.Schema

case class SchemaAdapter(schema: Schema) extends PropertyGraphType {

override def labelSets: Set[Set[String]] = schema.labelCombinations.combos

override def relationshipTypes: Set[String] = schema.relationshipTypes


}