Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ include(':okapi-relational')
include(':okapi-tck')
include(':okapi-trees')
include(':spark-cypher')
include(':spark-cypher-morpheus')
include(':spark-cypher-examples')
include(':spark-cypher-tck')
include(':spark-cypher-testing')
Expand Down
28 changes: 28 additions & 0 deletions spark-cypher-morpheus/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

description = 'Spark Cypher Morpheus'

dependencies {
compile project(':spark-cypher')
compile group: 'org.apache.spark', name: "spark-cypher".scala(), version: "3.0.0-SNAPSHOT"


// compileOnly group: 'org.apache.spark', name: "spark-core".scala(), version: ver.spark
// compileOnly group: 'org.apache.spark', name: "spark-sql".scala(), version: ver.spark
// compileOnly group: 'org.apache.spark', name: "spark-catalyst".scala(), version: ver.spark
}

task allJar(type: ShadowJar) {
classifier = 'all'

from project.sourceSets.main.output
configurations = [project.configurations.runtime]

dependencies {
exclude(dependency('org.scala-lang:'))
exclude(dependency('org.scala-lang.modules:'))
}
exclude "META-INF/versions/**/*"
}

pub.full.artifacts += 'allJar'
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"tableStorageFormat" : "csv",
"tags" : [
0
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
2001,Alice
2002,Bob
2003,Eve
2004,Carol
2005,Carl
2006,Dave
2007,Mallory
2008,Trudy
2009,Trent
2010,Oscar
2011,Victor
2012,Peggy
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
1001,Book,246,1984
1002,Book,842,Cryptonomicon
1003,Book,950,The Eye of the World
1004,Book,478,The Circle
1005,DVD,102,Terminator 2
1006,DVD,820,Die Hard 3
1007,DVD,152,Matrix
1008,DVD,927,Iron Man
1009,Video,832,Jurassic Park
1010,Video,112,Das Boot
1011,Video,862,Sharknado
1012,Video,347,Turtles
1013,Music,886,Shakira
1014,Music,454,Roland Kaiser
1015,Music,743,Snap
1016,Music,623,Dr.Alban
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"version": "1.0",
"labelPropertyMap": [
{
"labels": [
"Product"
],
"properties": {
"title": "STRING",
"rank": "INTEGER",
"category": "STRING"
}
},
{
"labels": [
"Customer"
],
"properties": {
"name": "STRING"
}
}
],
"relTypePropertyMap": [
{
"relType": "BOUGHT",
"properties": {
"rating": "INTEGER",
"helpful": "INTEGER",
"votes": "INTEGER"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
10001,2001,1001,7,4,10
10002,2001,1005,8,5,10
10003,2004,1009,7,4,10
10004,2004,1013,8,5,10
10005,2007,1002,7,4,10
10006,2007,1006,8,5,10
10007,2010,1003,7,4,10
10008,2010,1007,5,5,10
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.opencypher.morpheus

import org.apache.spark.graph.api.NodeFrame
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.opencypher.okapi.api.graph.Namespace
import org.opencypher.spark.api.GraphSources

object GraphApp extends App {
implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("error")

// implicit val cypherSession = SparkCypherSession.create
implicit val cypherSession = MorpheusSession.create

// SPIP API
val nodeData: DataFrame = spark.createDataFrame(Seq(0 -> "Alice", 1 -> "Bob")).toDF("id", "name")
val nodeFrame: NodeFrame = NodeFrame(initialDf = nodeData, idColumn = "id", labelSet = Set("Person"))

val graph = cypherSession.createGraph(Seq(nodeFrame))
val result = graph.cypher("MATCH (n) RETURN n")
result.df.show()

// Okapi API

// CAPSSession needs to be in implicit scope for PGDSs etc.
implicit val caps = cypherSession.caps

cypherSession.registerSource(Namespace("fs"), GraphSources.fs(getClass.getResource("/csv").getFile).csv)
cypherSession.cypher(s"FROM GRAPH fs.products MATCH (n) RETURN n").show
cypherSession.cypher(
s"""
|FROM GRAPH fs.products
|MATCH (n)
|CONSTRUCT
| CREATE (n)
|RETURN GRAPH""".stripMargin).graph.nodes("n").show
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.opencypher.morpheus

import org.apache.spark.graph.api._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.opencypher.morpheus.adapters.RelationalGraphAdapter
import org.opencypher.okapi.api.value.CypherValue.CypherMap
import org.opencypher.okapi.impl.exception.IllegalArgumentException
import org.opencypher.okapi.relational.api.graph.{RelationalCypherGraph, RelationalCypherSession}
import org.opencypher.okapi.relational.api.table.RelationalCypherRecords
import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.CAPSEntityTableFactory
import org.opencypher.spark.impl.CAPSRecordsFactory
import org.opencypher.spark.impl.graph.CAPSGraphFactory
import org.opencypher.spark.impl.table.SparkTable.DataFrameTable

object MorpheusSession {
def create(implicit spark: SparkSession): MorpheusSession = new MorpheusSession(spark)
}

case class SparkCypherResult(relationalTable: RelationalCypherRecords[DataFrameTable]) extends CypherResult {
override val df: DataFrame = relationalTable.table.df
}

private[morpheus] class MorpheusSession(override val sparkSession: SparkSession) extends RelationalCypherSession[DataFrameTable] with CypherSession {

implicit val caps: CAPSSession = new CAPSSession(sparkSession)

// org.opencypher.okapi.relational.api.graph.RelationalCypherSession

override type Records = caps.Records

override val records: CAPSRecordsFactory = caps.records

override val graphs: CAPSGraphFactory = caps.graphs

override val entityTables: CAPSEntityTableFactory.type = caps.entityTables

// org.apache.spark.graph.api.CypherSession

override def cypher(
graph: PropertyGraph,
query: String
): CypherResult = cypher(graph, query, Map.empty)

override def cypher(
graph: PropertyGraph,
query: String,
parameters: Map[String, Any]
): CypherResult = {
val relationalGraph = toRelationalGraph(graph)
SparkCypherResult(relationalGraph.cypher(query, CypherMap(parameters.toSeq: _*)).records)
}

override def createGraph(
nodes: Seq[NodeFrame],
relationships: Seq[RelationshipFrame]
): PropertyGraph = {
require(nodes.groupBy(_.labelSet).forall(_._2.size == 1),
"There can be at most one NodeFrame per label set")
require(relationships.groupBy(_.relationshipType).forall(_._2.size == 1),
"There can be at most one RelationshipFrame per relationship type")
RelationalGraphAdapter(this, nodes, relationships)
}

override def load(path: String): PropertyGraph = ???

override def save(
graph: PropertyGraph,
path: String,
saveMode: SaveMode
): Unit = ???

private def toRelationalGraph(graph: PropertyGraph): RelationalCypherGraph[DataFrameTable] = {
graph match {
case adapter: RelationalGraphAdapter => adapter.graph
case other => throw IllegalArgumentException(
expected = "A graph that has been created by `SparkCypherSession.createGraph`",
actual = other.getClass.getSimpleName
)
}
}
}




Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opencypher.morpheus.adapters

import org.apache.spark.graph.api.{NodeFrame, RelationshipFrame}
import org.opencypher.okapi.api.io.conversion.{EntityMapping, NodeMappingBuilder, RelationshipMappingBuilder}

object MappingAdapter {

implicit class RichNodeDataFrame(val nodeDf: NodeFrame) extends AnyVal {
def toNodeMapping: EntityMapping = NodeMappingBuilder
.on(nodeDf.idColumn)
.withImpliedLabels(nodeDf.labelSet.toSeq: _*)
.withPropertyKeyMappings(nodeDf.properties.toSeq:_*)
.build
}

implicit class RichRelationshipDataFrame(val relDf: RelationshipFrame) extends AnyVal {
def toRelationshipMapping: EntityMapping = RelationshipMappingBuilder
.on(relDf.idColumn)
.withSourceStartNodeKey(relDf.sourceIdColumn)
.withSourceEndNodeKey(relDf.targetIdColumn)
.withRelType(relDf.relationshipType)
.withPropertyKeyMappings(relDf.properties.toSeq: _*)
.build
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.opencypher.morpheus.adapters

import org.apache.spark.cypher.adapters.MappingAdapter._
import org.apache.spark.graph.api.{NodeFrame, PropertyGraph, PropertyGraphType, RelationshipFrame}
import org.apache.spark.sql.DataFrame
import org.opencypher.morpheus.MorpheusSession
import org.opencypher.okapi.api.types.{CTNode, CTRelationship}
import org.opencypher.okapi.ir.api.expr.Var
import org.opencypher.spark.api.io.CAPSEntityTable

case class RelationalGraphAdapter(
cypherSession: MorpheusSession,
nodeFrames: Seq[NodeFrame],
relationshipFrames: Seq[RelationshipFrame]) extends PropertyGraph {

override def schema: PropertyGraphType = SchemaAdapter(graph.schema)

private[morpheus] lazy val graph = {
if (nodeFrames.isEmpty) {
cypherSession.graphs.empty
} else {
val nodeTables = nodeFrames.map { nodeDataFrame => CAPSEntityTable(nodeDataFrame.toNodeMapping, nodeDataFrame.df) }
val relTables = relationshipFrames.map { relDataFrame => CAPSEntityTable(relDataFrame.toRelationshipMapping, relDataFrame.df) }
cypherSession.graphs.create(nodeTables.head, nodeTables.tail ++ relTables: _*)
}
}

private lazy val _nodeFrame: Map[Set[String], NodeFrame] = nodeFrames.map(nf => nf.labelSet -> nf).toMap

private lazy val _relationshipFrame: Map[String, RelationshipFrame] = relationshipFrames.map(rf => rf.relationshipType -> rf).toMap

override def nodes: DataFrame = {
// TODO: move to API as default implementation
val nodeVar = Var("n")(CTNode)
val nodes = graph.nodes(nodeVar.name)

val df = nodes.table.df
val header = nodes.header

val idRename = header.column(nodeVar) -> "$ID"
val labelRenames = header.labelsFor(nodeVar).map(hasLabel => header.column(hasLabel) -> s":${hasLabel.label.name}").toSeq.sortBy(_._2)
val propertyRenames = header.propertiesFor(nodeVar).map(property => header.column(property) -> property.key.name).toSeq.sortBy(_._2)

val selectColumns = (Seq(idRename) ++ labelRenames ++ propertyRenames).map { case (oldColumn, newColumn) => df.col(oldColumn).as(newColumn) }

df.select(selectColumns: _*)
}

override def relationships: DataFrame = {
// TODO: move to API as default implementation
val relVar = Var("r")(CTRelationship)
val rels = graph.relationships(relVar.name)

val df = rels.table.df
val header = rels.header

val idRename = header.column(relVar) -> "$ID"
val sourceIdRename = header.column(header.startNodeFor(relVar)) -> "$SOURCE_ID"
val targetIdRename = header.column(header.endNodeFor(relVar)) -> "$TARGET_ID"
val relTypeRenames = header.typesFor(relVar).map(hasType => header.column(hasType) -> s":${hasType.relType.name}").toSeq.sortBy(_._2)
val propertyRenames = header.propertiesFor(relVar).map(property => header.column(property) -> property.key.name).toSeq.sortBy(_._2)

val selectColumns = (Seq(idRename, sourceIdRename, targetIdRename) ++ relTypeRenames ++ propertyRenames).map { case (oldColumn, newColumn) => df.col(oldColumn).as(newColumn) }

df.select(selectColumns: _*)
}

override def nodeFrame(labelSet: Set[String]): NodeFrame = _nodeFrame(labelSet)

override def relationshipFrame(relationshipType: String): RelationshipFrame = _relationshipFrame(relationshipType)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opencypher.morpheus.adapters

import org.apache.spark.graph.api.PropertyGraphType
import org.opencypher.okapi.api.schema.Schema

case class SchemaAdapter(schema: Schema) extends PropertyGraphType {

override def labelSets: Set[Set[String]] = schema.labelCombinations.combos

override def relationshipTypes: Set[String] = schema.relationshipTypes


}