Skip to content

Commit

Permalink
re-import the batch data one by one when batch write failed
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Nov 17, 2023
1 parent c094a21 commit db824fb
Showing 1 changed file with 59 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.vesoft.nebula.ErrorCode
import org.apache.log4j.Logger

import scala.collection.JavaConversions.seqAsJavaList
import scala.collection.mutable.ListBuffer

abstract class ServerBaseWriter extends Writer {
private[this] val BATCH_INSERT_TEMPLATE = "INSERT %s `%s`(%s) VALUES %s"
Expand Down Expand Up @@ -267,9 +268,9 @@ abstract class ServerBaseWriter extends Writer {
.mkString(";")
}

def writeVertices(vertices: Vertices, ignoreIndex: Boolean): String
def writeVertices(vertices: Vertices, ignoreIndex: Boolean): List[String]

def writeEdges(edges: Edges, ignoreIndex: Boolean): String
def writeEdges(edges: Edges, ignoreIndex: Boolean): List[String]

def writeNgql(ngql: String): String
}
Expand Down Expand Up @@ -332,45 +333,87 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
sentence
}

override def writeVertices(vertices: Vertices, ignoreIndex: Boolean = false): String = {
val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode)
override def writeVertices(vertices: Vertices, ignoreIndex: Boolean = false): List[String] = {
val failedStatements = new ListBuffer[String]()
val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode)
if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) {
val result = graphProvider.submit(session, statement)
if (result._2.isSucceeded) {
LOG.info(
s">>>>> write ${config.name}, batch size(${vertices.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency})")
return null
s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result._2.getLatency})")
return failedStatements.toList
}
LOG.error(
s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement")
if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}")
}
LOG.error(
s">>>>>> write ${config.name} failed for: ${result._2.getErrorMessage}, now retry writing one by one.")
// re-execute the vertices one by one
vertices.values.foreach(value => {
val vers = Vertices(vertices.names, List(value), vertices.policy)
val failedStatement = writeVertex(vers)
if (failedStatement != null) failedStatements.append(failedStatement)
})
} else {
LOG.error(s">>>>>> write vertex failed because write speed is too fast")
}
statement
failedStatements.toList
}

override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): String = {
val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode)
private def writeVertex(vertices: Vertices): String = {
val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode)
val result = graphProvider.submit(session, statement)
if (result._2.isSucceeded) {
LOG.info(
s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result._2.getLatency})")
null
} else {
LOG.error(s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement")
statement
}
}

override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): List[String] = {
val failedStatements = new ListBuffer[String]()
val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode)
if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) {
val result = graphProvider.submit(session, statement)
if (result._2.isSucceeded) {
LOG.info(
s">>>>>> write ${config.name}, batch size(${edges.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency}us)")
return null
s">>>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result._2.getLatency}us)")
return failedStatements.toList
}
LOG.error(s">>>>>> write edge failed for ${result._2.getErrorMessage}")
if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}")
s">>>>>> write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}")
}
LOG.error(
s">>>>>> write ${config.name} failed for: ${result._2.getErrorMessage}, now retry writing one by one.")
// re-execute the edges one by one
edges.values.foreach(value => {
val es = Edges(edges.names, List(value), edges.sourcePolicy, edges.targetPolicy)
val failedStatement = writeEdge(es)
if (failedStatement != null) failedStatements.append(failedStatement)
})

} else {
LOG.error(s">>>>>> write vertex failed because write speed is too fast")
}
statement
failedStatements.toList
}

private def writeEdge(edges: Edges): String = {
val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode)
val result = graphProvider.submit(session, statement)
if (result._2.isSucceeded) {
LOG.info(
s">>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result._2.getLatency})")
null
} else {
LOG.error(s">>>>> write edge failed for ${result._2.getErrorMessage} statement: \n $statement")
statement
}
}

override def writeNgql(ngql: String): String = {
Expand Down

0 comments on commit db824fb

Please sign in to comment.