From db824fb8f082c043dd29ee18d56be40bde617bac Mon Sep 17 00:00:00 2001 From: Anqi Date: Fri, 17 Nov 2023 15:30:49 +0800 Subject: [PATCH] re-import the batch data one by one when batch write failed --- .../common/writer/ServerBaseWriter.scala | 75 +++++++++++++++---- 1 file changed, 59 insertions(+), 16 deletions(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala index 0f208ce..7444b93 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala @@ -23,6 +23,7 @@ import com.vesoft.nebula.ErrorCode import org.apache.log4j.Logger import scala.collection.JavaConversions.seqAsJavaList +import scala.collection.mutable.ListBuffer abstract class ServerBaseWriter extends Writer { private[this] val BATCH_INSERT_TEMPLATE = "INSERT %s `%s`(%s) VALUES %s" @@ -267,9 +268,9 @@ abstract class ServerBaseWriter extends Writer { .mkString(";") } - def writeVertices(vertices: Vertices, ignoreIndex: Boolean): String + def writeVertices(vertices: Vertices, ignoreIndex: Boolean): List[String] - def writeEdges(edges: Edges, ignoreIndex: Boolean): String + def writeEdges(edges: Edges, ignoreIndex: Boolean): List[String] def writeNgql(ngql: String): String } @@ -332,45 +333,87 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, sentence } - override def writeVertices(vertices: Vertices, ignoreIndex: Boolean = false): String = { - val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode) + override def writeVertices(vertices: Vertices, ignoreIndex: Boolean = false): List[String] = { + val failedStatements = new ListBuffer[String]() + val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode) if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, statement) if (result._2.isSucceeded) { LOG.info( - s">>>>> write ${config.name}, batch size(${vertices.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency})") - return null + s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result._2.getLatency})") + return failedStatements.toList } - LOG.error( - s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement") if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { throw new RuntimeException( s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}") } + LOG.error( + s">>>>>> write ${config.name} failed for: ${result._2.getErrorMessage}, now retry writing one by one.") + // re-execute the vertices one by one + vertices.values.foreach(value => { + val vers = Vertices(vertices.names, List(value), vertices.policy) + val failedStatement = writeVertex(vers) + if (failedStatement != null) failedStatements.append(failedStatement) + }) } else { LOG.error(s">>>>>> write vertex failed because write speed is too fast") } - statement + failedStatements.toList } - override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): String = { - val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode) + private def writeVertex(vertices: Vertices): String = { + val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode) + val result = graphProvider.submit(session, statement) + if (result._2.isSucceeded) { + LOG.info( + s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result._2.getLatency})") + null + } else { + LOG.error(s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement") + statement + } + } + + override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): List[String] = { + val failedStatements = new ListBuffer[String]() + val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode) if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, statement) if (result._2.isSucceeded) { LOG.info( - s">>>>>> write ${config.name}, batch size(${edges.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency}us)") - return null + s">>>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result._2.getLatency}us)") + return failedStatements.toList } - LOG.error(s">>>>>> write edge failed for ${result._2.getErrorMessage}") if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { throw new RuntimeException( - s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}") + s">>>>>> write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}") } + LOG.error( + s">>>>>> write ${config.name} failed for: ${result._2.getErrorMessage}, now retry writing one by one.") + // re-execute the edges one by one + edges.values.foreach(value => { + val es = Edges(edges.names, List(value), edges.sourcePolicy, edges.targetPolicy) + val failedStatement = writeEdge(es) + if (failedStatement != null) failedStatements.append(failedStatement) + }) + } else { LOG.error(s">>>>>> write vertex failed because write speed is too fast") } - statement + failedStatements.toList + } + + private def writeEdge(edges: Edges): String = { + val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode) + val result = graphProvider.submit(session, statement) + if (result._2.isSucceeded) { + LOG.info( + s">>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result._2.getLatency})") + null + } else { + LOG.error(s">>>>> write edge failed for ${result._2.getErrorMessage} statement: \n $statement") + statement + } } override def writeNgql(ngql: String): String = {