From 28bce1fe34e839ca0975cd7604979975ec7adcbe Mon Sep 17 00:00:00 2001 From: Nicola Vitucci Date: Tue, 16 Jan 2024 14:23:12 +0000 Subject: [PATCH 01/13] Update gitignore --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 40e4a19..0311c21 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,7 @@ target/ # node modules node_modules/ -.env \ No newline at end of file +.env + +# IDE +.vscode/ \ No newline at end of file From bc5c9e1d20c64193df9137b538b76055d14abfb9 Mon Sep 17 00:00:00 2001 From: Nicola Vitucci Date: Tue, 16 Jan 2024 14:37:18 +0000 Subject: [PATCH 02/13] Add considerations to write query --- modules/ROOT/pages/writing.adoc | 66 +++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/modules/ROOT/pages/writing.adoc b/modules/ROOT/pages/writing.adoc index 2a9464b..60cb6a2 100644 --- a/modules/ROOT/pages/writing.adoc +++ b/modules/ROOT/pages/writing.adoc @@ -189,6 +189,7 @@ Writing data to a Neo4j database can be done in three ways: In case you use the option `query`, the Spark Connector persists the entire Dataset by using the provided query. The nodes are sent to Neo4j in a batch of rows defined in the `batch.size` property, and your query is wrapped up in an `UNWIND $events AS event` statement. +The `query` option supports both `CREATE` and `MERGE` clauses. Let's look at the following simple Spark program: @@ -199,11 +200,24 @@ import org.apache.spark.sql.{SaveMode, SparkSession} val spark = SparkSession.builder().getOrCreate() import spark.implicits._ -val df = (1 to 10)/*...*/.toDF() +case class Person(name: String, surname: String, age: Int) + +// Create an example DataFrame +val df = Seq( + Person("John", "Doe", 42), + Person("Jane", "Doe", 40) +).toDF() + +// Define the Cypher query to use in the write +val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})" + df.write .format("org.neo4j.spark.DataSource") .option("url", "bolt://localhost:7687") - .option("query", "CREATE (n:Person {fullName: event.name + event.surname})") + .option("authentication.basic.username", USERNAME) + .option("authentication.basic.password", PASSWORD) + .option("query", query) + .mode(SaveMode.Overwrite) .save() ---- @@ -212,11 +226,57 @@ This generates the following query: [source,cypher] ---- UNWIND $events AS event -CREATE (n:Person {fullName: event.name + event.surname}) +CREATE (n:Person {fullName: event.name + ' ' + event.surname}) ---- Thus `events` is the batch created from your dataset. +==== Considerations + +* You must specify the write mode (from `SaveMode`): +** `Append`: uses CREATE (Spark 3.x) +** `Overwrite`: uses MERGE +** `ErrorIfExists`: uses CREATE (Spark 2.x) + +* You can use the `events` list in `WITH` statements as well. +For example, you can replace the query in the previous example with the following: + +[source,scala] +---- +val query = """ + |WITH event.name + ' ' + toUpper(event.surname) AS fullName + |CREATE (n:Person {fullName: fullName}) +""".stripMargin +---- + +* Subqueries that reference the `events` list in ``CALL``s are supported: + +[source,scala] +---- +val query = """ + |CALL { + | WITH event + | RETURN event.name + ' ' + toUpper(event.surname) AS fullName + |} + |CREATE (n:Person {fullName: fullName}) +""".stripMargin +---- + +* If APOC is installed, APOC procedures and functions can be used: + +[source,scala] +---- +val query = """ + |CALL { + | WITH event + | RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName + |} + |CREATE (n:Person {fullName: fullName}) +""".stripMargin +---- + +* Although a `RETURN` clause is not forbidden, adding one does not have any effect on the query result. + [[write-node]] === Node From c1a01eeecbd28c670eb1b09870fa81e18f2437a1 Mon Sep 17 00:00:00 2001 From: Nicola Vitucci Date: Tue, 16 Jan 2024 15:13:51 +0000 Subject: [PATCH 03/13] Add example of NoClassDefFoundError to FAQ --- modules/ROOT/pages/faq.adoc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/modules/ROOT/pages/faq.adoc b/modules/ROOT/pages/faq.adoc index 92dcf8e..1c41312 100644 --- a/modules/ROOT/pages/faq.adoc +++ b/modules/ROOT/pages/faq.adoc @@ -88,6 +88,13 @@ NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport Caused by: ClassNotFoundException: org.apache.spark.sql.sources.v2.ReadSupport ---- +Or the following: + +---- +java.lang.NoClassDefFoundError: scala/collection/IterableOnce +Caused by: java.lang.ClassNotFoundException: scala.collection.IterableOnce +---- + This means that your Spark version doesn't match the Spark version on the connector. Refer to xref:overview.adoc#_spark_and_scala_compatibility[this page] to know which version you need. From 85ae314073f0c368bfd108dc39f47da28762393a Mon Sep 17 00:00:00 2001 From: Nicola Vitucci Date: Tue, 16 Jan 2024 15:21:50 +0000 Subject: [PATCH 04/13] Add reference to Save mode section --- modules/ROOT/pages/writing.adoc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/ROOT/pages/writing.adoc b/modules/ROOT/pages/writing.adoc index 60cb6a2..f7c9414 100644 --- a/modules/ROOT/pages/writing.adoc +++ b/modules/ROOT/pages/writing.adoc @@ -233,10 +233,7 @@ Thus `events` is the batch created from your dataset. ==== Considerations -* You must specify the write mode (from `SaveMode`): -** `Append`: uses CREATE (Spark 3.x) -** `Overwrite`: uses MERGE -** `ErrorIfExists`: uses CREATE (Spark 2.x) +* You must always specify the <>. * You can use the `events` list in `WITH` statements as well. For example, you can replace the query in the previous example with the following: From 61500195781bfda8c65adf4466ec2477ca23daed Mon Sep 17 00:00:00 2001 From: Nicola Vitucci Date: Tue, 16 Jan 2024 16:13:24 +0000 Subject: [PATCH 05/13] Add details on indexes and constraints --- modules/ROOT/pages/writing.adoc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/modules/ROOT/pages/writing.adoc b/modules/ROOT/pages/writing.adoc index f7c9414..bcb53d6 100644 --- a/modules/ROOT/pages/writing.adoc +++ b/modules/ROOT/pages/writing.adoc @@ -703,7 +703,11 @@ Before the import starts, the following schema query is being created: CREATE INDEX ON :Person(surname) ---- -*Take into consideration that the first label is used for the index creation.* +The name of the created index is `spark_INDEX_