Skip to content

Commit

Permalink
update log info for failed statement (#29)
Browse files Browse the repository at this point in the history
* update test

* update log info for failed statement

* update match statement

* fix test
  • Loading branch information
Nicole00 authored Jan 10, 2022
1 parent 805adc3 commit aa5a971
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.vesoft.nebula.connector.writer

import com.vesoft.nebula.connector.NebulaOptions
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{
DataSourceWriter,
Expand Down Expand Up @@ -61,7 +62,11 @@ class NebulaDataSourceVertexWriter(nebulaOptions: NebulaOptions,
LOG.debug(s"${messages.length}")
for (msg <- messages) {
val nebulaMsg = msg.asInstanceOf[NebulaCommitMessage]
LOG.info(s"failed execs:\n ${nebulaMsg.executeStatements.toString()}")
if (nebulaMsg.executeStatements.nonEmpty) {
LOG.error(s"failed execs:\n ${nebulaMsg.executeStatements.toString()}")
} else {
LOG.info(s"execs for spark partition ${TaskContext.getPartitionId()} all succeed")
}
}
}

Expand Down Expand Up @@ -89,7 +94,11 @@ class NebulaDataSourceEdgeWriter(nebulaOptions: NebulaOptions,
LOG.debug(s"${messages.length}")
for (msg <- messages) {
val nebulaMsg = msg.asInstanceOf[NebulaCommitMessage]
LOG.info(s"failed execs:\n ${nebulaMsg.executeStatements.toString()}")
if (nebulaMsg.executeStatements.nonEmpty) {
LOG.error(s"failed execs:\n ${nebulaMsg.executeStatements.toString()}")
} else {
LOG.info(s"execs for spark partition ${TaskContext.getPartitionId()} all succeed")
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class WriteInsertSuite extends AnyFunSuite with BeforeAndAfterAll {

Thread.sleep(5000)

graphProvider.submit("use test_write_string;")
val resultSet: ResultSet =
graphProvider.submit("use test_write_string;match (v:person_connector) return v;")
graphProvider.submit("match (v:person_connector) return v;")
assert(resultSet.getColumnNames.size() == 1)
assert(resultSet.getRows.size() == 13)

Expand All @@ -62,9 +63,9 @@ class WriteInsertSuite extends AnyFunSuite with BeforeAndAfterAll {

Thread.sleep(5000)

graphProvider.submit("use test_write_string;")
val resultSet: ResultSet =
graphProvider.submit(
"use test_write_string;match (v:person_connector)-[e:friend_connector] -> () return e;")
graphProvider.submit("match (v:person_connector)-[e:friend_connector] -> () return e;")
assert(resultSet.getColumnNames.size() == 1)
assert(resultSet.getRows.size() == 13)

Expand Down

0 comments on commit aa5a971

Please sign in to comment.