Skip to content

Commit dc95f29

Browse files
authored
Improve docs to help debugging (#28)
1 parent 1a9229f commit dc95f29

File tree

6 files changed

+115
-13
lines changed

6 files changed

+115
-13
lines changed

.gitignore

+4-1
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,7 @@ target/
3535

3636
# node modules
3737
node_modules/
38-
.env
38+
.env
39+
40+
# IDE
41+
.vscode/

modules/ROOT/pages/faq.adoc

+6-1
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,18 @@ We are working to fully support all the save modes on Spark 3.
8181

8282
== I am getting errors _NoClassDefFoundError_ or _ClassNotFoundException_. What should I do?
8383

84-
You may get this type of error:
84+
You may get one of the following types of error:
8585

8686
----
8787
NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport
8888
Caused by: ClassNotFoundException: org.apache.spark.sql.sources.v2.ReadSupport
8989
----
9090

91+
----
92+
java.lang.NoClassDefFoundError: scala/collection/IterableOnce
93+
Caused by: java.lang.ClassNotFoundException: scala.collection.IterableOnce
94+
----
95+
9196
This means that your Spark version doesn't match the Spark version on the connector.
9297
Refer to xref:overview.adoc#_spark_and_scala_compatibility[this page] to know which version you need.
9398

modules/ROOT/pages/reading.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -606,3 +606,8 @@ Where:
606606
* `<target_labels>` is the list of labels provided by `relationship.target.labels` option
607607
* `<relationship>` is the list of labels provided by `relationship` option
608608
* `<limit>` is the value provided via `schema.flatten.limit`
609+
610+
=== Performance considerations
611+
612+
If the schema is not specified, the Spark Connector uses sampling as explained xref:quickstart.adoc#_schema[here] and xref:architecture.adoc#_schema_considerations[here].
613+
Since sampling is potentially an expensive operation, consider xref:quickstart.adoc#user-defined-schema[supplying your own schema].

modules/ROOT/pages/writing.adoc

+98-9
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ Writing data to a Neo4j database can be done in three ways:
189189

190190
In case you use the option `query`, the Spark Connector persists the entire Dataset by using the provided query.
191191
The nodes are sent to Neo4j in a batch of rows defined in the `batch.size` property, and your query is wrapped up in an `UNWIND $events AS event` statement.
192+
The `query` option supports both `CREATE` and `MERGE` clauses.
192193

193194
Let's look at the following simple Spark program:
194195

@@ -199,11 +200,24 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
199200
val spark = SparkSession.builder().getOrCreate()
200201
import spark.implicits._
201202
202-
val df = (1 to 10)/*...*/.toDF()
203+
case class Person(name: String, surname: String, age: Int)
204+
205+
// Create an example DataFrame
206+
val df = Seq(
207+
Person("John", "Doe", 42),
208+
Person("Jane", "Doe", 40)
209+
).toDF()
210+
211+
// Define the Cypher query to use in the write
212+
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"
213+
203214
df.write
204215
.format("org.neo4j.spark.DataSource")
205216
.option("url", "bolt://localhost:7687")
206-
.option("query", "CREATE (n:Person {fullName: event.name + event.surname})")
217+
.option("authentication.basic.username", USERNAME)
218+
.option("authentication.basic.password", PASSWORD)
219+
.option("query", query)
220+
.mode(SaveMode.Overwrite)
207221
.save()
208222
----
209223

@@ -212,11 +226,48 @@ This generates the following query:
212226
[source,cypher]
213227
----
214228
UNWIND $events AS event
215-
CREATE (n:Person {fullName: event.name + event.surname})
229+
CREATE (n:Person {fullName: event.name + ' ' + event.surname})
216230
----
217231

218232
Thus `events` is the batch created from your dataset.
219233

234+
==== Considerations
235+
236+
* You must always specify the <<save-mode>>.
237+
238+
* You can use the `events` list in `WITH` statements as well.
239+
For example, you can replace the query in the previous example with the following:
240+
+
241+
[source, cypher]
242+
----
243+
WITH event.name + ' ' + toUpper(event.surname) AS fullName
244+
CREATE (n:Person {fullName: fullName})
245+
----
246+
247+
* Subqueries that reference the `events` list in ``CALL``s are supported:
248+
+
249+
[source, cypher]
250+
----
251+
CALL {
252+
WITH event
253+
RETURN event.name + ' ' + toUpper(event.surname) AS fullName
254+
}
255+
CREATE (n:Person {fullName: fullName})
256+
----
257+
258+
* If APOC is installed, APOC procedures and functions can be used:
259+
+
260+
[source, cypher]
261+
----
262+
CALL {
263+
WITH event
264+
RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName
265+
}
266+
CREATE (n:Person {fullName: fullName})
267+
----
268+
269+
* Although a `RETURN` clause is not forbidden, adding one does not have any effect on the query result.
270+
220271
[[write-node]]
221272
=== Node
222273

@@ -351,6 +402,11 @@ Neo4j Connector for Apache Spark flattens the maps, and each map value is in it'
351402

352403
You can write a DataFrame to Neo4j by specifying source, target nodes, and relationships.
353404

405+
[WARNING]
406+
====
407+
To avoid deadlocks, always use a single partition (for example with `coalesce(1)`) before writing relationships to Neo4j.
408+
====
409+
354410
==== Overview
355411

356412
Before diving into the actual process, let's clarify the vocabulary first. Since this method of writing data to Neo4j is more complex and few combinations of options can be used, let's spend more time on explaining it.
@@ -417,6 +473,7 @@ val originalDf = spark.read.format("org.neo4j.spark.DataSource")
417473
418474
originalDf
419475
.where("`target.price` > 2000")
476+
.coalesce(1)
420477
.write
421478
.format("org.neo4j.spark.DataSource")
422479
.option("url", "bolt://expensiveprod.host.com:7687")
@@ -474,7 +531,8 @@ val df = spark.read.format("org.neo4j.spark.DataSource")
474531
.option("relationship.target.labels", "Product")
475532
.load()
476533
477-
df.write
534+
df.coalesce(1)
535+
.write
478536
.format("org.neo4j.spark.DataSource")
479537
.option("url", "bolt://second.host.com:7687")
480538
.option("relationship", "SOLD")
@@ -534,7 +592,8 @@ val musicDf = Seq(
534592
(15, "John Butler", "Guitar")
535593
).toDF("experience", "name", "instrument")
536594
537-
musicDf.write
595+
musicDf.coalesce(1)
596+
.write
538597
.format("org.neo4j.spark.DataSource")
539598
.option("url", "bolt://localhost:7687")
540599
.option("relationship", "PLAYS")
@@ -581,7 +640,8 @@ val musicDf = Seq(
581640
(15, "John Butler", "Wooden", "Guitar")
582641
).toDF("experience", "name", "instrument_color", "instrument")
583642
584-
musicDf.write
643+
musicDf.coalesce(1)
644+
.write
585645
.format("org.neo4j.spark.DataSource")
586646
.option("url", "bolt://localhost:7687")
587647
.option("relationship", "PLAYS")
@@ -624,6 +684,10 @@ You can set the optimization via `schema.optimization.type` option that works on
624684
* `INDEX`: it creates only indexes on provided nodes.
625685
* `NODE_CONSTRAINTS`: it creates only indexes on provided nodes.
626686

687+
[IMPORTANT]
688+
The `schema.optimization.type` option cannot be used with the `query` option.
689+
If you are using a <<write-query, custom Cypher query>>, you need to create indexes and constraints manually using the <<script-option, `script` option>>.
690+
627691

628692
==== Index creation
629693

@@ -646,12 +710,16 @@ Before the import starts, the following schema query is being created:
646710
CREATE INDEX ON :Person(surname)
647711
----
648712

649-
*Take into consideration that the first label is used for the index creation.*
713+
The name of the created index is `spark_INDEX_<LABEL>_<NODE_KEYS>`, where `<LABEL>` is the first label from the `labels` option and `<NODE_KEYS>` is a dash-separated sequence of one or more properties as specified in the `node.keys` options.
714+
In this example, the name of the created index is `spark_INDEX_Person_surname`.
715+
If the `node.keys` option were set to `"name,surname"` instead, the index name would be `spark_INDEX_Person_name-surname`.
716+
717+
The index is not recreated if it is already present.
650718

651719

652720
==== Constraint creation
653721

654-
Below you can see an example of how to create indexes while you're creating nodes.
722+
Below you can see an example of how to create constraints while you're creating nodes.
655723

656724
----
657725
ds.write
@@ -670,8 +738,13 @@ Before the import starts, the code above creates the following schema query:
670738
CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.surname) IS UNIQUE
671739
----
672740

673-
*Take into consideration that the first label is used for the index creation.*
741+
The name of the created constraint is `spark_NODE_CONSTRAINTS_<LABEL>_<NODE_KEYS>`, where `<LABEL>` is the first label from the `labels` option and `<NODE_KEYS>` is a dash-separated sequence of one or more properties as specified in the `node.keys` options.
742+
In this example, the name of the created constraint is `spark_NODE_CONSTRAINTS_Person_surname`.
743+
If the `node.keys` option were set to `"name,surname"` instead, the constraint name would be `spark_NODE_CONSTRAINTS_Person_name-surname`.
674744

745+
The constraint is not recreated if it is already present.
746+
747+
[[script-option]]
675748
=== Script option
676749

677750
The script option allows you to execute a series of preparation script before Spark
@@ -709,6 +782,22 @@ CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0
709782
`scriptResult` is the result from the last query contained within the `script` options
710783
that is `RETURN 36 AS age;`
711784

785+
=== Performance considerations
786+
787+
Since writing is typically an expensive operation, make sure you write only the columns you need from the DataFrame.
788+
For example, if the columns from the data source are `name`, `surname`, `age`, and `livesIn`, but you only need `name` and `surname`, you can do the following:
789+
790+
[source, scala]
791+
----
792+
ds.select(ds("name"), ds("surname"))
793+
.write
794+
.format("org.neo4j.spark.DataSource")
795+
.mode(SaveMode.ErrorIfExists)
796+
.option("url", "bolt://localhost:7687")
797+
.option("labels", ":Person:Customer")
798+
.save()
799+
----
800+
712801
== Note about columns with Map type
713802

714803
When a Dataframe column is a map, what we do internally is to flatten the map as Neo4j does not support this type for graph entity properties; so for a Spark job like this:

preview.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ asciidoc:
5252
includePDF: false
5353
nonhtmloutput: ""
5454
experimental: ''
55-
copyright: 2023
55+
copyright: 2024
5656
common-license-page-uri: https://neo4j.com/docs/license/
5757
check-mark: icon:check[]
5858
cross-mark: icon:times[]

publish.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ asciidoc:
5353
includePDF: false
5454
nonhtmloutput: ""
5555
experimental: ''
56-
copyright: 2023
56+
copyright: 2024
5757
common-license-page-uri: https://neo4j.com/docs/license/
5858
check-mark: icon:check[]
5959
cross-mark: icon:times[]

0 commit comments

Comments
 (0)