Skip to content

Improve docs to help debugging #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 5, 2024
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ target/

# node modules
node_modules/
.env
.env

# IDE
.vscode/
7 changes: 6 additions & 1 deletion modules/ROOT/pages/faq.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,18 @@ We are working to fully support all the save modes on Spark 3.

== I am getting errors _NoClassDefFoundError_ or _ClassNotFoundException_. What should I do?

You may get this type of error:
You may get one of the following types of error:

----
NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport
Caused by: ClassNotFoundException: org.apache.spark.sql.sources.v2.ReadSupport
----

----
java.lang.NoClassDefFoundError: scala/collection/IterableOnce
Caused by: java.lang.ClassNotFoundException: scala.collection.IterableOnce
----

This means that your Spark version doesn't match the Spark version on the connector.
Refer to xref:overview.adoc#_spark_and_scala_compatibility[this page] to know which version you need.

Expand Down
5 changes: 5 additions & 0 deletions modules/ROOT/pages/reading.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,8 @@ Where:
* `<target_labels>` is the list of labels provided by `relationship.target.labels` option
* `<relationship>` is the list of labels provided by `relationship` option
* `<limit>` is the value provided via `schema.flatten.limit`

=== Performance considerations

If the schema is not specified, the Spark Connector uses sampling as explained xref:quickstart.adoc#_schema[here] and xref:architecture.adoc#_schema_considerations[here].
Since sampling is potentially an expensive operation, consider xref:quickstart.adoc#user-defined-schema[supplying your own schema].
107 changes: 98 additions & 9 deletions modules/ROOT/pages/writing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ Writing data to a Neo4j database can be done in three ways:

In case you use the option `query`, the Spark Connector persists the entire Dataset by using the provided query.
The nodes are sent to Neo4j in a batch of rows defined in the `batch.size` property, and your query is wrapped up in an `UNWIND $events AS event` statement.
The `query` option supports both `CREATE` and `MERGE` clauses.

Let's look at the following simple Spark program:

Expand All @@ -199,11 +200,24 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val df = (1 to 10)/*...*/.toDF()
case class Person(name: String, surname: String, age: Int)

// Create an example DataFrame
val df = Seq(
Person("John", "Doe", 42),
Person("Jane", "Doe", 40)
).toDF()

// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

df.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("query", "CREATE (n:Person {fullName: event.name + event.surname})")
.option("authentication.basic.username", USERNAME)
.option("authentication.basic.password", PASSWORD)
.option("query", query)
.mode(SaveMode.Overwrite)
.save()
----

Expand All @@ -212,11 +226,48 @@ This generates the following query:
[source,cypher]
----
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + event.surname})
CREATE (n:Person {fullName: event.name + ' ' + event.surname})
----

Thus `events` is the batch created from your dataset.

==== Considerations

* You must always specify the <<save-mode>>.

* You can use the `events` list in `WITH` statements as well.
For example, you can replace the query in the previous example with the following:
+
[source, cypher]
----
WITH event.name + ' ' + toUpper(event.surname) AS fullName
CREATE (n:Person {fullName: fullName})
----

* Subqueries that reference the `events` list in ``CALL``s are supported:
+
[source, cypher]
----
CALL {
WITH event
RETURN event.name + ' ' + toUpper(event.surname) AS fullName
}
CREATE (n:Person {fullName: fullName})
----

* If APOC is installed, APOC procedures and functions can be used:
+
[source, cypher]
----
CALL {
WITH event
RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName
}
CREATE (n:Person {fullName: fullName})
----

* Although a `RETURN` clause is not forbidden, adding one does not have any effect on the query result.

[[write-node]]
=== Node

Expand Down Expand Up @@ -351,6 +402,11 @@ Neo4j Connector for Apache Spark flattens the maps, and each map value is in it'

You can write a DataFrame to Neo4j by specifying source, target nodes, and relationships.

[WARNING]
====
To avoid deadlocks, always use a single partition (for example with `coalesce(1)`) before writing relationships to Neo4j.
====

==== Overview

Before diving into the actual process, let's clarify the vocabulary first. Since this method of writing data to Neo4j is more complex and few combinations of options can be used, let's spend more time on explaining it.
Expand Down Expand Up @@ -417,6 +473,7 @@ val originalDf = spark.read.format("org.neo4j.spark.DataSource")

originalDf
.where("`target.price` > 2000")
.coalesce(1)
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://expensiveprod.host.com:7687")
Expand Down Expand Up @@ -474,7 +531,8 @@ val df = spark.read.format("org.neo4j.spark.DataSource")
.option("relationship.target.labels", "Product")
.load()

df.write
df.coalesce(1)
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://second.host.com:7687")
.option("relationship", "SOLD")
Expand Down Expand Up @@ -534,7 +592,8 @@ val musicDf = Seq(
(15, "John Butler", "Guitar")
).toDF("experience", "name", "instrument")

musicDf.write
musicDf.coalesce(1)
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "PLAYS")
Expand Down Expand Up @@ -581,7 +640,8 @@ val musicDf = Seq(
(15, "John Butler", "Wooden", "Guitar")
).toDF("experience", "name", "instrument_color", "instrument")

musicDf.write
musicDf.coalesce(1)
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "PLAYS")
Expand Down Expand Up @@ -624,6 +684,10 @@ You can set the optimization via `schema.optimization.type` option that works on
* `INDEX`: it creates only indexes on provided nodes.
* `NODE_CONSTRAINTS`: it creates only indexes on provided nodes.

[IMPORTANT]
The `schema.optimization.type` option cannot be used with the `query` option.
If you are using a <<write-query, custom Cypher query>>, you need to create indexes and constraints manually using the <<script-option, `script` option>>.


==== Index creation

Expand All @@ -646,12 +710,16 @@ Before the import starts, the following schema query is being created:
CREATE INDEX ON :Person(surname)
----

*Take into consideration that the first label is used for the index creation.*
The name of the created index is `spark_INDEX_<LABEL>_<NODE_KEYS>`, where `<LABEL>` is the first label from the `labels` option and `<NODE_KEYS>` is a dash-separated sequence of one or more properties as specified in the `node.keys` options.
In this example, the name of the created index is `spark_INDEX_Person_surname`.
If the `node.keys` option were set to `"name,surname"` instead, the index name would be `spark_INDEX_Person_name-surname`.

The index is not recreated if it is already present.


==== Constraint creation

Below you can see an example of how to create indexes while you're creating nodes.
Below you can see an example of how to create constraints while you're creating nodes.

----
ds.write
Expand All @@ -670,8 +738,13 @@ Before the import starts, the code above creates the following schema query:
CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.surname) IS UNIQUE
----

*Take into consideration that the first label is used for the index creation.*
The name of the created constraint is `spark_NODE_CONSTRAINTS_<LABEL>_<NODE_KEYS>`, where `<LABEL>` is the first label from the `labels` option and `<NODE_KEYS>` is a dash-separated sequence of one or more properties as specified in the `node.keys` options.
In this example, the name of the created constraint is `spark_NODE_CONSTRAINTS_Person_surname`.
If the `node.keys` option were set to `"name,surname"` instead, the constraint name would be `spark_NODE_CONSTRAINTS_Person_name-surname`.

The constraint is not recreated if it is already present.

[[script-option]]
=== Script option

The script option allows you to execute a series of preparation script before Spark
Expand Down Expand Up @@ -709,6 +782,22 @@ CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0
`scriptResult` is the result from the last query contained within the `script` options
that is `RETURN 36 AS age;`

=== Performance considerations

Since writing is typically an expensive operation, make sure you write only the columns you need from the DataFrame.
For example, if the columns from the data source are `name`, `surname`, `age`, and `livesIn`, but you only need `name` and `surname`, you can do the following:

[source, scala]
----
ds.select(ds("name"), ds("surname"))
.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.save()
----

== Note about columns with Map type

When a Dataframe column is a map, what we do internally is to flatten the map as Neo4j does not support this type for graph entity properties; so for a Spark job like this:
Expand Down
2 changes: 1 addition & 1 deletion preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ asciidoc:
includePDF: false
nonhtmloutput: ""
experimental: ''
copyright: 2023
copyright: 2024
common-license-page-uri: https://neo4j.com/docs/license/
check-mark: icon:check[]
cross-mark: icon:times[]
Expand Down
2 changes: 1 addition & 1 deletion publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ asciidoc:
includePDF: false
nonhtmloutput: ""
experimental: ''
copyright: 2023
copyright: 2024
common-license-page-uri: https://neo4j.com/docs/license/
check-mark: icon:check[]
cross-mark: icon:times[]
Expand Down