diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala index afb729f8..d76676ab 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala @@ -49,11 +49,10 @@ class ReloadProcessor(data: DataFrame, val startTime = System.currentTimeMillis iterator.foreach { row => val ngql = row.getString(0) - val recordSize = computeRecordNumber(ngql) - val failStatement = writer.writeNgql(row.getString(0)) + val failStatement = writer.writeNgql(ngql) if (failStatement == null) { batchSuccess.add(1) - recordSuccess.add(recordSize) + recordSuccess.add(1) } else { errorBuffer.append(failStatement) batchFailure.add(1) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala index 0f208cea..7444b933 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala @@ -23,6 +23,7 @@ import com.vesoft.nebula.ErrorCode import org.apache.log4j.Logger import scala.collection.JavaConversions.seqAsJavaList +import scala.collection.mutable.ListBuffer abstract class ServerBaseWriter extends Writer { private[this] val BATCH_INSERT_TEMPLATE = "INSERT %s `%s`(%s) VALUES %s" @@ -267,9 +268,9 @@ abstract class ServerBaseWriter extends Writer { .mkString(";") } - def writeVertices(vertices: Vertices, ignoreIndex: Boolean): String + def writeVertices(vertices: Vertices, ignoreIndex: Boolean): List[String] - def writeEdges(edges: Edges, ignoreIndex: Boolean): String + def writeEdges(edges: Edges, ignoreIndex: Boolean): List[String] def writeNgql(ngql: String): String } @@ -332,45 +333,87 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, sentence } - override def writeVertices(vertices: Vertices, ignoreIndex: Boolean = false): String = { - val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode) + override def writeVertices(vertices: Vertices, ignoreIndex: Boolean = false): List[String] = { + val failedStatements = new ListBuffer[String]() + val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode) if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, statement) if (result._2.isSucceeded) { LOG.info( - s">>>>> write ${config.name}, batch size(${vertices.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency})") - return null + s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result._2.getLatency})") + return failedStatements.toList } - LOG.error( - s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement") if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { throw new RuntimeException( s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}") } + LOG.error( + s">>>>>> write ${config.name} failed for: ${result._2.getErrorMessage}, now retry writing one by one.") + // re-execute the vertices one by one + vertices.values.foreach(value => { + val vers = Vertices(vertices.names, List(value), vertices.policy) + val failedStatement = writeVertex(vers) + if (failedStatement != null) failedStatements.append(failedStatement) + }) } else { LOG.error(s">>>>>> write vertex failed because write speed is too fast") } - statement + failedStatements.toList } - override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): String = { - val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode) + private def writeVertex(vertices: Vertices): String = { + val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode) + val result = graphProvider.submit(session, statement) + if (result._2.isSucceeded) { + LOG.info( + s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result._2.getLatency})") + null + } else { + LOG.error(s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement") + statement + } + } + + override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): List[String] = { + val failedStatements = new ListBuffer[String]() + val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode) if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, statement) if (result._2.isSucceeded) { LOG.info( - s">>>>>> write ${config.name}, batch size(${edges.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency}us)") - return null + s">>>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result._2.getLatency}us)") + return failedStatements.toList } - LOG.error(s">>>>>> write edge failed for ${result._2.getErrorMessage}") if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { throw new RuntimeException( - s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}") + s">>>>>> write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}") } + LOG.error( + s">>>>>> write ${config.name} failed for: ${result._2.getErrorMessage}, now retry writing one by one.") + // re-execute the edges one by one + edges.values.foreach(value => { + val es = Edges(edges.names, List(value), edges.sourcePolicy, edges.targetPolicy) + val failedStatement = writeEdge(es) + if (failedStatement != null) failedStatements.append(failedStatement) + }) + } else { LOG.error(s">>>>>> write vertex failed because write speed is too fast") } - statement + failedStatements.toList + } + + private def writeEdge(edges: Edges): String = { + val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode) + val result = graphProvider.submit(session, statement) + if (result._2.isSucceeded) { + LOG.info( + s">>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result._2.getLatency})") + null + } else { + LOG.error(s">>>>> write edge failed for ${result._2.getErrorMessage} statement: \n $statement") + statement + } } override def writeNgql(ngql: String): String = { diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/writer/ServerBaseWriterSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/writer/ServerBaseWriterSuite.scala index e166ea6b..5a8d77f5 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/writer/ServerBaseWriterSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/writer/ServerBaseWriterSuite.scala @@ -193,9 +193,9 @@ class ServerBaseWriterSuite extends ServerBaseWriter { } - override def writeVertices(vertices: Vertices, ignoreIndex: Boolean): String = ??? + override def writeVertices(vertices: Vertices, ignoreIndex: Boolean): List[String] = ??? - override def writeEdges(edges: common.Edges, ignoreIndex: Boolean): String = ??? + override def writeEdges(edges: common.Edges, ignoreIndex: Boolean): List[String] = ??? override def writeNgql(ngql: String): String = ??? diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 6aad4090..f150d939 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -137,11 +137,10 @@ object Exchange { val data = spark.read.text(c.reload) val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess) processor.process() - LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}") - LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}") - LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}") + LOG.info(s">>>>> recordSuccess.reimport: ${batchSuccess.value}") + LOG.info(s">>>>> recordFailure.reimport: ${batchFailure.value}") LOG.info( - s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0) + s">>>>> exchange reimport job finished, cost:${((System.currentTimeMillis() - start) / 1000.0) .formatted("%.2f")}s") sys.exit(0) @@ -289,31 +288,6 @@ object Exchange { } } - // reimport for failed tags and edges - val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}" - if (failures > 0 && ErrorHandler.existError(errorPath)) { - spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}") - - val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") - val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") - val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport") - val data = spark.read.text(errorPath) - val start = System.currentTimeMillis() - val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess) - processor.process() - val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") - LOG.info(s">>>>> reimport ngql cost time: ${costTime}") - LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}") - LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}") - LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}") - totalClientBatchSuccess += batchSuccess.value - totalClientBatchFailure -= batchSuccess.value - totalClientRecordSuccess += recordSuccess.value - totalClientRecordFailure -= recordSuccess.value - if (totalClientRecordFailure < 0) { - totalClientRecordFailure = 0 - } - } spark.close() val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble LOG.info( diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 86dcf55f..9fb18114 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -67,16 +67,18 @@ class EdgeProcessor(spark: SparkSession, writer.prepare() // batch write tags val startTime = System.currentTimeMillis - iterator.grouped(edgeConfig.batch).foreach { edge => - val edges = Edges(nebulaKeys, edge.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy) - val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex) - if (failStatement == null) { + iterator.grouped(edgeConfig.batch).foreach { edgeSet => + val edges = + Edges(nebulaKeys, edgeSet.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy) + val failStatements = writer.writeEdges(edges, edgeConfig.ignoreIndex) + if (failStatements.isEmpty) { batchSuccess.add(1) - recordSuccess.add(edge.toList.size) + recordSuccess.add(edgeSet.size) } else { - errorBuffer.append(failStatement) + errorBuffer.append(failStatements: _*) batchFailure.add(1) - recordFailure.add(edge.toList.size) + recordSuccess.add(edgeSet.size - failStatements.size) + recordFailure.add(failStatements.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { writeErrorStatement(errorBuffer) throw TooManyErrorsException( diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 73d53ae0..f980d8f6 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -74,16 +74,17 @@ class VerticesProcessor(spark: SparkSession, writer.prepare() // batch write tags val startTime = System.currentTimeMillis - iterator.grouped(tagConfig.batch).foreach { vertex => - val vertices = Vertices(nebulaKeys, vertex.toList, tagConfig.vertexPolicy) - val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex) - if (failStatement == null) { + iterator.grouped(tagConfig.batch).foreach { vertexSet => + val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy) + val failStatements = writer.writeVertices(vertices, tagConfig.ignoreIndex) + if (failStatements.isEmpty) { batchSuccess.add(1) - recordSuccess.add(vertex.toList.size) + recordSuccess.add(vertexSet.size) } else { - errorBuffer.append(failStatement) + errorBuffer.append(failStatements: _*) batchFailure.add(1) - recordFailure.add(vertex.toList.size) + recordSuccess.add(vertexSet.size - failStatements.size) + recordFailure.add(failStatements.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { writeErrorStatement(errorBuffer) throw TooManyErrorsException( diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 3f065280..8832d253 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -137,11 +137,10 @@ object Exchange { val data = spark.read.text(c.reload) val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess) processor.process() - LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}") - LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}") - LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}") + LOG.info(s">>>>> recordSuccess.reimport: ${batchSuccess.value}") + LOG.info(s">>>>> recordFailure.reimport: ${batchFailure.value}") LOG.info( - s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0) + s">>>>> exchange reimport job finished, cost:${((System.currentTimeMillis() - start) / 1000.0) .formatted("%.2f")}s") sys.exit(0) } @@ -288,30 +287,6 @@ object Exchange { } } - // reimport for failed tags and edges - val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}" - if (failures > 0 && ErrorHandler.existError(errorPath)) { - spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}") - val start = System.currentTimeMillis() - val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") - val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") - val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport") - val data = spark.read.text(errorPath) - val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess) - processor.process() - val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") - LOG.info(s">>>>> reimport ngql cost time: ${costTime}") - LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}") - LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}") - LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}") - totalClientBatchSuccess += batchSuccess.value - totalClientBatchFailure -= batchSuccess.value - totalClientRecordSuccess += recordSuccess.value - totalClientRecordFailure -= recordSuccess.value - if (totalClientRecordFailure < 0) { - totalClientRecordFailure = 0 - } - } spark.close() val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble LOG.info( diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index a458c3b0..de8a9db3 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -43,8 +43,8 @@ class EdgeProcessor(spark: SparkSession, config: Configs, batchSuccess: LongAccumulator, batchFailure: LongAccumulator, - recordSuccess:LongAccumulator, - recordFailure:LongAccumulator) + recordSuccess: LongAccumulator, + recordFailure: LongAccumulator) extends Processor { @transient @@ -68,16 +68,18 @@ class EdgeProcessor(spark: SparkSession, writer.prepare() // batch write tags val startTime = System.currentTimeMillis - iterator.grouped(edgeConfig.batch).foreach { edge => - val edges = Edges(nebulaKeys, edge.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy) - val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex) - if (failStatement == null) { + iterator.grouped(edgeConfig.batch).foreach { edgeSet => + val edges = + Edges(nebulaKeys, edgeSet.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy) + val failStatements = writer.writeEdges(edges, edgeConfig.ignoreIndex) + if (failStatements.isEmpty) { batchSuccess.add(1) - recordSuccess.add(edge.toList.size) + recordSuccess.add(edgeSet.size) } else { - errorBuffer.append(failStatement) + errorBuffer.append(failStatements: _*) batchFailure.add(1) - recordFailure.add(edge.toList.size) + recordSuccess.add(edgeSet.size - failStatements.size) + recordFailure.add(failStatements.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { writeErrorStatement(errorBuffer) throw TooManyErrorsException( @@ -427,7 +429,7 @@ class EdgeProcessor(spark: SparkSession, (positiveEdgeKey, reverseEdgeKey, edgeValue) } - private def writeErrorStatement(errorBuffer:ArrayBuffer[String]): Unit = { + private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = { if (errorBuffer.nonEmpty) { val appId = SparkEnv.get.blockManager.conf.getAppId ErrorHandler.save( diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index ece00520..dc06fb19 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -52,8 +52,8 @@ class VerticesProcessor(spark: SparkSession, config: Configs, batchSuccess: LongAccumulator, batchFailure: LongAccumulator, - recordSuccess:LongAccumulator, - recordFailure:LongAccumulator) + recordSuccess: LongAccumulator, + recordFailure: LongAccumulator) extends Processor { @transient @@ -76,16 +76,17 @@ class VerticesProcessor(spark: SparkSession, writer.prepare() // batch write tags val startTime = System.currentTimeMillis - iterator.grouped(tagConfig.batch).foreach { vertex => - val vertices = Vertices(nebulaKeys, vertex.toList, tagConfig.vertexPolicy) - val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex) - if (failStatement == null) { + iterator.grouped(tagConfig.batch).foreach { vertexSet => + val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy) + val failStatements = writer.writeVertices(vertices, tagConfig.ignoreIndex) + if (failStatements.isEmpty) { batchSuccess.add(1) - recordSuccess.add(vertex.toList.size) + recordSuccess.add(vertexSet.size) } else { - errorBuffer.append(failStatement) + errorBuffer.append(failStatements: _*) batchFailure.add(1) - recordFailure.add(vertex.toList.size) + recordSuccess.add(vertexSet.size - failStatements.size) + recordFailure.add(failStatements.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { writeErrorStatement(errorBuffer) throw TooManyErrorsException( diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 8729deb6..b53190e1 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -137,9 +137,8 @@ object Exchange { val data = spark.read.text(c.reload) val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess) processor.process() - LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}") - LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}") - LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}") + LOG.info(s">>>>> recordSuccess.reimport: ${batchSuccess.value}") + LOG.info(s">>>>> recordFailure.reimport: ${batchFailure.value}") LOG.info( s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0) .formatted("%.2f")}s") @@ -287,30 +286,6 @@ object Exchange { } } - // reimport for failed tags and edges - val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}" - if (failures > 0 && ErrorHandler.existError(errorPath)) { - spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}") - val start = System.currentTimeMillis() - val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") - val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") - val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport") - val data = spark.read.text(errorPath) - val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess) - processor.process() - val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") - LOG.info(s">>>>> reimport ngql cost time: ${costTime}") - LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}") - LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}") - LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}") - totalClientBatchSuccess += batchSuccess.value - totalClientBatchFailure -= batchSuccess.value - totalClientRecordSuccess += recordSuccess.value - totalClientRecordFailure -= recordSuccess.value - if (totalClientRecordFailure < 0) { - totalClientRecordFailure = 0 - } - } spark.close() val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble LOG.info( diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index a7002ecc..b2e4aa81 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -43,8 +43,8 @@ class EdgeProcessor(spark: SparkSession, config: Configs, batchSuccess: LongAccumulator, batchFailure: LongAccumulator, - recordSuccess:LongAccumulator, - recordFailure:LongAccumulator) + recordSuccess: LongAccumulator, + recordFailure: LongAccumulator) extends Processor { @transient @@ -68,16 +68,18 @@ class EdgeProcessor(spark: SparkSession, writer.prepare() // batch write tags val startTime = System.currentTimeMillis - iterator.grouped(edgeConfig.batch).foreach { edge => - val edges = Edges(nebulaKeys, edge.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy) - val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex) - if (failStatement == null) { + iterator.grouped(edgeConfig.batch).foreach { edgeSet => + val edges = + Edges(nebulaKeys, edgeSet.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy) + val failStatements = writer.writeEdges(edges, edgeConfig.ignoreIndex) + if (failStatements.isEmpty) { batchSuccess.add(1) - recordSuccess.add(edge.toList.size) + recordSuccess.add(edgeSet.toList.size) } else { - errorBuffer.append(failStatement) + errorBuffer.append(failStatements: _*) batchFailure.add(1) - recordFailure.add(edge.toList.size) + recordSuccess.add(edgeSet.size - failStatements.size) + recordFailure.add(failStatements.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { writeErrorStatement(errorBuffer) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index d39785e3..3299bef1 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -76,16 +76,17 @@ class VerticesProcessor(spark: SparkSession, writer.prepare() // batch write tags val startTime = System.currentTimeMillis - iterator.grouped(tagConfig.batch).foreach { vertex => - val vertices = Vertices(nebulaKeys, vertex.toList, tagConfig.vertexPolicy) - val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex) - if (failStatement == null) { + iterator.grouped(tagConfig.batch).foreach { vertexSet => + val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy) + val failStatements = writer.writeVertices(vertices, tagConfig.ignoreIndex) + if (failStatements.isEmpty) { batchSuccess.add(1) - recordSuccess.add(vertex.toList.size) + recordSuccess.add(vertexSet.size) } else { - errorBuffer.append(failStatement) + errorBuffer.append(failStatements: _*) batchFailure.add(1) - recordFailure.add(vertex.toList.size) + recordSuccess.add(vertexSet.size - failStatements.size) + recordFailure.add(failStatements.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { writeErrorStatement(errorBuffer) throw TooManyErrorsException(