diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaSourceWriter.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaSourceWriter.scala index c13209b3..651a2d84 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaSourceWriter.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaSourceWriter.scala @@ -6,6 +6,7 @@ package com.vesoft.nebula.connector.writer import com.vesoft.nebula.connector.NebulaOptions +import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.writer.{ DataSourceWriter, @@ -61,7 +62,11 @@ class NebulaDataSourceVertexWriter(nebulaOptions: NebulaOptions, LOG.debug(s"${messages.length}") for (msg <- messages) { val nebulaMsg = msg.asInstanceOf[NebulaCommitMessage] - LOG.info(s"failed execs:\n ${nebulaMsg.executeStatements.toString()}") + if (nebulaMsg.executeStatements.nonEmpty) { + LOG.error(s"failed execs:\n ${nebulaMsg.executeStatements.toString()}") + } else { + LOG.info(s"execs for spark partition ${TaskContext.getPartitionId()} all succeed") + } } } @@ -89,7 +94,11 @@ class NebulaDataSourceEdgeWriter(nebulaOptions: NebulaOptions, LOG.debug(s"${messages.length}") for (msg <- messages) { val nebulaMsg = msg.asInstanceOf[NebulaCommitMessage] - LOG.info(s"failed execs:\n ${nebulaMsg.executeStatements.toString()}") + if (nebulaMsg.executeStatements.nonEmpty) { + LOG.error(s"failed execs:\n ${nebulaMsg.executeStatements.toString()}") + } else { + LOG.info(s"execs for spark partition ${TaskContext.getPartitionId()} all succeed") + } } } diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala index bcf0e1fe..5b9bf051 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala @@ -37,8 +37,9 @@ class WriteInsertSuite extends AnyFunSuite with BeforeAndAfterAll { Thread.sleep(5000) + graphProvider.submit("use test_write_string;") val resultSet: ResultSet = - graphProvider.submit("use test_write_string;match (v:person_connector) return v;") + graphProvider.submit("match (v:person_connector) return v;") assert(resultSet.getColumnNames.size() == 1) assert(resultSet.getRows.size() == 13) @@ -62,9 +63,9 @@ class WriteInsertSuite extends AnyFunSuite with BeforeAndAfterAll { Thread.sleep(5000) + graphProvider.submit("use test_write_string;") val resultSet: ResultSet = - graphProvider.submit( - "use test_write_string;match (v:person_connector)-[e:friend_connector] -> () return e;") + graphProvider.submit("match (v:person_connector)-[e:friend_connector] -> () return e;") assert(resultSet.getColumnNames.size() == 1) assert(resultSet.getRows.size() == 13)