diff --git a/settings.gradle b/settings.gradle index 27dc621740..702d844a58 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,6 +9,7 @@ include(':okapi-relational') include(':okapi-tck') include(':okapi-trees') include(':spark-cypher') +include(':spark-cypher-morpheus') include(':spark-cypher-examples') include(':spark-cypher-tck') include(':spark-cypher-testing') diff --git a/spark-cypher-morpheus/build.gradle b/spark-cypher-morpheus/build.gradle new file mode 100644 index 0000000000..b42da2e2dd --- /dev/null +++ b/spark-cypher-morpheus/build.gradle @@ -0,0 +1,28 @@ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +description = 'Spark Cypher Morpheus' + +dependencies { + compile project(':spark-cypher') + compile group: 'org.apache.spark', name: "spark-cypher".scala(), version: "3.0.0-SNAPSHOT" + + +// compileOnly group: 'org.apache.spark', name: "spark-core".scala(), version: ver.spark +// compileOnly group: 'org.apache.spark', name: "spark-sql".scala(), version: ver.spark +// compileOnly group: 'org.apache.spark', name: "spark-catalyst".scala(), version: ver.spark +} + +task allJar(type: ShadowJar) { + classifier = 'all' + + from project.sourceSets.main.output + configurations = [project.configurations.runtime] + + dependencies { + exclude(dependency('org.scala-lang:')) + exclude(dependency('org.scala-lang.modules:')) + } + exclude "META-INF/versions/**/*" +} + +pub.full.artifacts += 'allJar' diff --git a/spark-cypher-morpheus/src/main/resources/csv/products/capsGraphMetaData.json b/spark-cypher-morpheus/src/main/resources/csv/products/capsGraphMetaData.json new file mode 100644 index 0000000000..5ee3e02906 --- /dev/null +++ b/spark-cypher-morpheus/src/main/resources/csv/products/capsGraphMetaData.json @@ -0,0 +1,6 @@ +{ + "tableStorageFormat" : "csv", + "tags" : [ + 0 + ] +} diff --git a/spark-cypher-morpheus/src/main/resources/csv/products/nodes/Customer/table.csv b/spark-cypher-morpheus/src/main/resources/csv/products/nodes/Customer/table.csv new file mode 100644 index 0000000000..83091d43fb --- /dev/null +++ b/spark-cypher-morpheus/src/main/resources/csv/products/nodes/Customer/table.csv @@ -0,0 +1,12 @@ +2001,Alice +2002,Bob +2003,Eve +2004,Carol +2005,Carl +2006,Dave +2007,Mallory +2008,Trudy +2009,Trent +2010,Oscar +2011,Victor +2012,Peggy diff --git a/spark-cypher-morpheus/src/main/resources/csv/products/nodes/Product/table.csv b/spark-cypher-morpheus/src/main/resources/csv/products/nodes/Product/table.csv new file mode 100644 index 0000000000..799ff56f03 --- /dev/null +++ b/spark-cypher-morpheus/src/main/resources/csv/products/nodes/Product/table.csv @@ -0,0 +1,16 @@ +1001,Book,246,1984 +1002,Book,842,Cryptonomicon +1003,Book,950,The Eye of the World +1004,Book,478,The Circle +1005,DVD,102,Terminator 2 +1006,DVD,820,Die Hard 3 +1007,DVD,152,Matrix +1008,DVD,927,Iron Man +1009,Video,832,Jurassic Park +1010,Video,112,Das Boot +1011,Video,862,Sharknado +1012,Video,347,Turtles +1013,Music,886,Shakira +1014,Music,454,Roland Kaiser +1015,Music,743,Snap +1016,Music,623,Dr.Alban diff --git a/spark-cypher-morpheus/src/main/resources/csv/products/propertyGraphSchema.json b/spark-cypher-morpheus/src/main/resources/csv/products/propertyGraphSchema.json new file mode 100644 index 0000000000..a79f359a29 --- /dev/null +++ b/spark-cypher-morpheus/src/main/resources/csv/products/propertyGraphSchema.json @@ -0,0 +1,33 @@ +{ + "version": "1.0", + "labelPropertyMap": [ + { + "labels": [ + "Product" + ], + "properties": { + "title": "STRING", + "rank": "INTEGER", + "category": "STRING" + } + }, + { + "labels": [ + "Customer" + ], + "properties": { + "name": "STRING" + } + } + ], + "relTypePropertyMap": [ + { + "relType": "BOUGHT", + "properties": { + "rating": "INTEGER", + "helpful": "INTEGER", + "votes": "INTEGER" + } + } + ] +} \ No newline at end of file diff --git a/spark-cypher-morpheus/src/main/resources/csv/products/relationships/BOUGHT/table.csv b/spark-cypher-morpheus/src/main/resources/csv/products/relationships/BOUGHT/table.csv new file mode 100644 index 0000000000..b131a426e5 --- /dev/null +++ b/spark-cypher-morpheus/src/main/resources/csv/products/relationships/BOUGHT/table.csv @@ -0,0 +1,8 @@ +10001,2001,1001,7,4,10 +10002,2001,1005,8,5,10 +10003,2004,1009,7,4,10 +10004,2004,1013,8,5,10 +10005,2007,1002,7,4,10 +10006,2007,1006,8,5,10 +10007,2010,1003,7,4,10 +10008,2010,1007,5,5,10 diff --git a/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/GraphApp.scala b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/GraphApp.scala new file mode 100644 index 0000000000..3b042fbe2c --- /dev/null +++ b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/GraphApp.scala @@ -0,0 +1,37 @@ +package org.opencypher.morpheus + +import org.apache.spark.graph.api.NodeFrame +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.opencypher.okapi.api.graph.Namespace +import org.opencypher.spark.api.GraphSources + +object GraphApp extends App { + implicit val spark = SparkSession.builder().master("local[*]").getOrCreate() + spark.sparkContext.setLogLevel("error") + +// implicit val cypherSession = SparkCypherSession.create + implicit val cypherSession = MorpheusSession.create + + // SPIP API + val nodeData: DataFrame = spark.createDataFrame(Seq(0 -> "Alice", 1 -> "Bob")).toDF("id", "name") + val nodeFrame: NodeFrame = NodeFrame(initialDf = nodeData, idColumn = "id", labelSet = Set("Person")) + + val graph = cypherSession.createGraph(Seq(nodeFrame)) + val result = graph.cypher("MATCH (n) RETURN n") + result.df.show() + + // Okapi API + + // CAPSSession needs to be in implicit scope for PGDSs etc. + implicit val caps = cypherSession.caps + + cypherSession.registerSource(Namespace("fs"), GraphSources.fs(getClass.getResource("/csv").getFile).csv) + cypherSession.cypher(s"FROM GRAPH fs.products MATCH (n) RETURN n").show + cypherSession.cypher( + s""" + |FROM GRAPH fs.products + |MATCH (n) + |CONSTRUCT + | CREATE (n) + |RETURN GRAPH""".stripMargin).graph.nodes("n").show +} diff --git a/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/MorpheusSession.scala b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/MorpheusSession.scala new file mode 100644 index 0000000000..cd45753fb3 --- /dev/null +++ b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/MorpheusSession.scala @@ -0,0 +1,86 @@ +package org.opencypher.morpheus + +import org.apache.spark.graph.api._ +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.opencypher.morpheus.adapters.RelationalGraphAdapter +import org.opencypher.okapi.api.value.CypherValue.CypherMap +import org.opencypher.okapi.impl.exception.IllegalArgumentException +import org.opencypher.okapi.relational.api.graph.{RelationalCypherGraph, RelationalCypherSession} +import org.opencypher.okapi.relational.api.table.RelationalCypherRecords +import org.opencypher.spark.api.CAPSSession +import org.opencypher.spark.api.io.CAPSEntityTableFactory +import org.opencypher.spark.impl.CAPSRecordsFactory +import org.opencypher.spark.impl.graph.CAPSGraphFactory +import org.opencypher.spark.impl.table.SparkTable.DataFrameTable + +object MorpheusSession { + def create(implicit spark: SparkSession): MorpheusSession = new MorpheusSession(spark) +} + +case class SparkCypherResult(relationalTable: RelationalCypherRecords[DataFrameTable]) extends CypherResult { + override val df: DataFrame = relationalTable.table.df +} + +private[morpheus] class MorpheusSession(override val sparkSession: SparkSession) extends RelationalCypherSession[DataFrameTable] with CypherSession { + + implicit val caps: CAPSSession = new CAPSSession(sparkSession) + + // org.opencypher.okapi.relational.api.graph.RelationalCypherSession + + override type Records = caps.Records + + override val records: CAPSRecordsFactory = caps.records + + override val graphs: CAPSGraphFactory = caps.graphs + + override val entityTables: CAPSEntityTableFactory.type = caps.entityTables + + // org.apache.spark.graph.api.CypherSession + + override def cypher( + graph: PropertyGraph, + query: String + ): CypherResult = cypher(graph, query, Map.empty) + + override def cypher( + graph: PropertyGraph, + query: String, + parameters: Map[String, Any] + ): CypherResult = { + val relationalGraph = toRelationalGraph(graph) + SparkCypherResult(relationalGraph.cypher(query, CypherMap(parameters.toSeq: _*)).records) + } + + override def createGraph( + nodes: Seq[NodeFrame], + relationships: Seq[RelationshipFrame] + ): PropertyGraph = { + require(nodes.groupBy(_.labelSet).forall(_._2.size == 1), + "There can be at most one NodeFrame per label set") + require(relationships.groupBy(_.relationshipType).forall(_._2.size == 1), + "There can be at most one RelationshipFrame per relationship type") + RelationalGraphAdapter(this, nodes, relationships) + } + + override def load(path: String): PropertyGraph = ??? + + override def save( + graph: PropertyGraph, + path: String, + saveMode: SaveMode + ): Unit = ??? + + private def toRelationalGraph(graph: PropertyGraph): RelationalCypherGraph[DataFrameTable] = { + graph match { + case adapter: RelationalGraphAdapter => adapter.graph + case other => throw IllegalArgumentException( + expected = "A graph that has been created by `SparkCypherSession.createGraph`", + actual = other.getClass.getSimpleName + ) + } + } +} + + + + diff --git a/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/MappingAdapter.scala b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/MappingAdapter.scala new file mode 100644 index 0000000000..ec40f19841 --- /dev/null +++ b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/MappingAdapter.scala @@ -0,0 +1,25 @@ +package org.opencypher.morpheus.adapters + +import org.apache.spark.graph.api.{NodeFrame, RelationshipFrame} +import org.opencypher.okapi.api.io.conversion.{EntityMapping, NodeMappingBuilder, RelationshipMappingBuilder} + +object MappingAdapter { + + implicit class RichNodeDataFrame(val nodeDf: NodeFrame) extends AnyVal { + def toNodeMapping: EntityMapping = NodeMappingBuilder + .on(nodeDf.idColumn) + .withImpliedLabels(nodeDf.labelSet.toSeq: _*) + .withPropertyKeyMappings(nodeDf.properties.toSeq:_*) + .build + } + + implicit class RichRelationshipDataFrame(val relDf: RelationshipFrame) extends AnyVal { + def toRelationshipMapping: EntityMapping = RelationshipMappingBuilder + .on(relDf.idColumn) + .withSourceStartNodeKey(relDf.sourceIdColumn) + .withSourceEndNodeKey(relDf.targetIdColumn) + .withRelType(relDf.relationshipType) + .withPropertyKeyMappings(relDf.properties.toSeq: _*) + .build + } +} diff --git a/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/RelationalGraphAdapter.scala b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/RelationalGraphAdapter.scala new file mode 100644 index 0000000000..086796357c --- /dev/null +++ b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/RelationalGraphAdapter.scala @@ -0,0 +1,72 @@ +package org.opencypher.morpheus.adapters + +import org.apache.spark.cypher.adapters.MappingAdapter._ +import org.apache.spark.graph.api.{NodeFrame, PropertyGraph, PropertyGraphType, RelationshipFrame} +import org.apache.spark.sql.DataFrame +import org.opencypher.morpheus.MorpheusSession +import org.opencypher.okapi.api.types.{CTNode, CTRelationship} +import org.opencypher.okapi.ir.api.expr.Var +import org.opencypher.spark.api.io.CAPSEntityTable + +case class RelationalGraphAdapter( + cypherSession: MorpheusSession, + nodeFrames: Seq[NodeFrame], + relationshipFrames: Seq[RelationshipFrame]) extends PropertyGraph { + + override def schema: PropertyGraphType = SchemaAdapter(graph.schema) + + private[morpheus] lazy val graph = { + if (nodeFrames.isEmpty) { + cypherSession.graphs.empty + } else { + val nodeTables = nodeFrames.map { nodeDataFrame => CAPSEntityTable(nodeDataFrame.toNodeMapping, nodeDataFrame.df) } + val relTables = relationshipFrames.map { relDataFrame => CAPSEntityTable(relDataFrame.toRelationshipMapping, relDataFrame.df) } + cypherSession.graphs.create(nodeTables.head, nodeTables.tail ++ relTables: _*) + } + } + + private lazy val _nodeFrame: Map[Set[String], NodeFrame] = nodeFrames.map(nf => nf.labelSet -> nf).toMap + + private lazy val _relationshipFrame: Map[String, RelationshipFrame] = relationshipFrames.map(rf => rf.relationshipType -> rf).toMap + + override def nodes: DataFrame = { + // TODO: move to API as default implementation + val nodeVar = Var("n")(CTNode) + val nodes = graph.nodes(nodeVar.name) + + val df = nodes.table.df + val header = nodes.header + + val idRename = header.column(nodeVar) -> "$ID" + val labelRenames = header.labelsFor(nodeVar).map(hasLabel => header.column(hasLabel) -> s":${hasLabel.label.name}").toSeq.sortBy(_._2) + val propertyRenames = header.propertiesFor(nodeVar).map(property => header.column(property) -> property.key.name).toSeq.sortBy(_._2) + + val selectColumns = (Seq(idRename) ++ labelRenames ++ propertyRenames).map { case (oldColumn, newColumn) => df.col(oldColumn).as(newColumn) } + + df.select(selectColumns: _*) + } + + override def relationships: DataFrame = { + // TODO: move to API as default implementation + val relVar = Var("r")(CTRelationship) + val rels = graph.relationships(relVar.name) + + val df = rels.table.df + val header = rels.header + + val idRename = header.column(relVar) -> "$ID" + val sourceIdRename = header.column(header.startNodeFor(relVar)) -> "$SOURCE_ID" + val targetIdRename = header.column(header.endNodeFor(relVar)) -> "$TARGET_ID" + val relTypeRenames = header.typesFor(relVar).map(hasType => header.column(hasType) -> s":${hasType.relType.name}").toSeq.sortBy(_._2) + val propertyRenames = header.propertiesFor(relVar).map(property => header.column(property) -> property.key.name).toSeq.sortBy(_._2) + + val selectColumns = (Seq(idRename, sourceIdRename, targetIdRename) ++ relTypeRenames ++ propertyRenames).map { case (oldColumn, newColumn) => df.col(oldColumn).as(newColumn) } + + df.select(selectColumns: _*) + } + + override def nodeFrame(labelSet: Set[String]): NodeFrame = _nodeFrame(labelSet) + + override def relationshipFrame(relationshipType: String): RelationshipFrame = _relationshipFrame(relationshipType) + +} diff --git a/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/SchemaAdapter.scala b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/SchemaAdapter.scala new file mode 100644 index 0000000000..ddec8095ec --- /dev/null +++ b/spark-cypher-morpheus/src/main/scala/org/opencypher/morpheus/adapters/SchemaAdapter.scala @@ -0,0 +1,13 @@ +package org.opencypher.morpheus.adapters + +import org.apache.spark.graph.api.PropertyGraphType +import org.opencypher.okapi.api.schema.Schema + +case class SchemaAdapter(schema: Schema) extends PropertyGraphType { + + override def labelSets: Set[Set[String]] = schema.labelCombinations.combos + + override def relationshipTypes: Set[String] = schema.relationshipTypes + + +}