diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala index 5c7dcb02..9326f915 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala @@ -31,7 +31,7 @@ class GraphProvider(addresses: List[HostAddress], timeout: Int, sslConfigEntry: @transient val nebulaPoolConfig = new NebulaPoolConfig @transient val pool: NebulaPool = new NebulaPool - val randAddr = scala.util.Random.shuffle(addresses) + val randAddr = scala.util.Random.shuffle(addresses) nebulaPoolConfig.setTimeout(timeout) @@ -63,14 +63,15 @@ class GraphProvider(addresses: List[HostAddress], timeout: Int, sslConfigEntry: pool.close() } - def switchSpace(session: Session, space: String): ResultSet = { + def switchSpace(session: Session, space: String): (HostAddress, ResultSet) = { val switchStatment = s"use $space" LOG.info(s">>>>>> switch space $space") val result = submit(session, switchStatment) result } - def submit(session: Session, statement: String): ResultSet = { - session.execute(statement) + def submit(session: Session, statement: String): (HostAddress, ResultSet) = { + val result = session.execute(statement) + (session.getGraphHost,result) } } diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala index 56005139..0f208cea 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala @@ -9,7 +9,16 @@ import java.util.concurrent.TimeUnit import com.google.common.util.concurrent.RateLimiter import com.vesoft.exchange.common.GraphProvider import com.vesoft.exchange.common.{Edges, KeyPolicy, Vertices} -import com.vesoft.exchange.common.config.{DataBaseConfigEntry, EdgeConfigEntry, RateConfigEntry, SchemaConfigEntry, TagConfigEntry, Type, UserConfigEntry, WriteMode} +import com.vesoft.exchange.common.config.{ + DataBaseConfigEntry, + EdgeConfigEntry, + RateConfigEntry, + SchemaConfigEntry, + TagConfigEntry, + Type, + UserConfigEntry, + WriteMode +} import com.vesoft.nebula.ErrorCode import org.apache.log4j.Logger @@ -36,8 +45,8 @@ abstract class ServerBaseWriter extends Writer { private[this] val UPDATE_VALUE_TEMPLATE = "`%s`=%s" /** - * construct insert statement for vertex - */ + * construct insert statement for vertex + */ def toExecuteSentence(name: String, vertices: Vertices, ignoreIndex: Boolean): String = { { if (ignoreIndex) BATCH_INSERT_IGNORE_INDEX_TEMPLATE else BATCH_INSERT_TEMPLATE } .format( @@ -67,8 +76,8 @@ abstract class ServerBaseWriter extends Writer { } /** - * construct delete statement for vertex - */ + * construct delete statement for vertex + */ def toDeleteExecuteSentence(vertices: Vertices, deleteEdge: Boolean): String = { { if (deleteEdge) BATCH_DELETE_VERTEX_WITH_EDGE_TEMPLATE else BATCH_DELETE_VERTEX_TEMPLATE } .format( @@ -96,8 +105,8 @@ abstract class ServerBaseWriter extends Writer { } /** - * construct update statement for vertex - */ + * construct update statement for vertex + */ def toUpdateExecuteSentence(tagName: String, vertices: Vertices): String = { vertices.values .map { vertex => @@ -130,8 +139,8 @@ abstract class ServerBaseWriter extends Writer { } /** - * construct insert statement for edge - */ + * construct insert statement for edge + */ def toExecuteSentence(name: String, edges: Edges, ignoreIndex: Boolean): String = { val values = edges.values .map { edge => @@ -175,8 +184,8 @@ abstract class ServerBaseWriter extends Writer { } /** - * construct delete statement for edge - */ + * construct delete statement for edge + */ def toDeleteExecuteSentence(edgeName: String, edges: Edges): String = { BATCH_DELETE_EDGE_TEMPLATE.format( Type.EDGE.toString, @@ -212,48 +221,48 @@ abstract class ServerBaseWriter extends Writer { } /** - * construct update statement for edge - */ + * construct update statement for edge + */ def toUpdateExecuteSentence(edgeName: String, edges: Edges): String = { edges.values .map { edge => - var index = 0 - val rank = if (edge.ranking.isEmpty) { 0 } else { edge.ranking.get } - UPDATE_EDGE_TEMPLATE.format( - Type.EDGE.toString, - edgeName, - edges.sourcePolicy match { - case Some(KeyPolicy.HASH) => - ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.source) - case Some(KeyPolicy.UUID) => - ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, edge.source) - case None => - edge.source - case _ => - throw new IllegalArgumentException( - s"source policy ${edges.sourcePolicy.get} is not supported") - }, - edges.targetPolicy match { - case Some(KeyPolicy.HASH) => - ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.destination) - case Some(KeyPolicy.UUID) => - ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.destination) - case None => - edge.destination - case _ => - throw new IllegalArgumentException( - s"target policy ${edges.targetPolicy.get} is not supported") - }, - rank, - edge.values - .map { value => - val updateValue = - UPDATE_VALUE_TEMPLATE.format(edges.names.get(index), value) - index += 1 - updateValue - } - .mkString(",") - ) + var index = 0 + val rank = if (edge.ranking.isEmpty) { 0 } else { edge.ranking.get } + UPDATE_EDGE_TEMPLATE.format( + Type.EDGE.toString, + edgeName, + edges.sourcePolicy match { + case Some(KeyPolicy.HASH) => + ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.source) + case Some(KeyPolicy.UUID) => + ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, edge.source) + case None => + edge.source + case _ => + throw new IllegalArgumentException( + s"source policy ${edges.sourcePolicy.get} is not supported") + }, + edges.targetPolicy match { + case Some(KeyPolicy.HASH) => + ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.destination) + case Some(KeyPolicy.UUID) => + ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.destination) + case None => + edge.destination + case _ => + throw new IllegalArgumentException( + s"target policy ${edges.targetPolicy.get} is not supported") + }, + rank, + edge.values + .map { value => + val updateValue = + UPDATE_VALUE_TEMPLATE.format(edges.names.get(index), value) + index += 1 + updateValue + } + .mkString(",") + ) } .mkString(";") } @@ -287,9 +296,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, def prepare(): Unit = { val switchResult = graphProvider.switchSpace(session, dataBaseConfigEntry.space) - if (!switchResult.isSucceeded) { + if (!switchResult._2.isSucceeded) { this.close() - throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage) + throw new RuntimeException("Switch Failed for " + switchResult._2.getErrorMessage) } LOG.info(s">>>>>> Connection to ${dataBaseConfigEntry.graphAddress}") @@ -327,15 +336,16 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode) if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, statement) - if (result.isSucceeded) { + if (result._2.isSucceeded) { LOG.info( - s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})") + s">>>>> write ${config.name}, batch size(${vertices.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency})") return null } - LOG.error(s">>>>> write vertex failed for ${result.getErrorMessage} statement: \n $statement") - if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { + LOG.error( + s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement") + if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { throw new RuntimeException( - s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}") + s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}") } } else { LOG.error(s">>>>>> write vertex failed because write speed is too fast") @@ -347,15 +357,15 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode) if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, statement) - if (result.isSucceeded) { + if (result._2.isSucceeded) { LOG.info( - s">>>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)") + s">>>>>> write ${config.name}, batch size(${edges.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency}us)") return null } - LOG.error(s">>>>>> write edge failed for ${result.getErrorMessage}") - if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { + LOG.error(s">>>>>> write edge failed for ${result._2.getErrorMessage}") + if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { throw new RuntimeException( - s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}") + s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}") } } else { LOG.error(s">>>>>> write vertex failed because write speed is too fast") @@ -366,10 +376,10 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, override def writeNgql(ngql: String): String = { if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, ngql) - if (result.isSucceeded) { + if (result._2.isSucceeded) { return null } - LOG.error(s">>>>>> reimport ngql failed for ${result.getErrorMessage}") + LOG.error(s">>>>>> reimport ngql failed for ${result._2.getErrorMessage}") } else { LOG.error(s">>>>>> reimport ngql failed because write speed is too fast") } diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/GraphProviderSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/GraphProviderSuite.scala index a851813e..552f3f9a 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/GraphProviderSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/GraphProviderSuite.scala @@ -38,15 +38,15 @@ class GraphProviderSuite { @Test def switchSpaceSuite(): Unit = { session = graphProvider.getGraphClient(userConfig) - assert(graphProvider.switchSpace(session, "test_string").isSucceeded) - assert(graphProvider.switchSpace(session, "test_int").isSucceeded) + assert(graphProvider.switchSpace(session, "test_string")._2.isSucceeded) + assert(graphProvider.switchSpace(session, "test_int")._2.isSucceeded) graphProvider.releaseGraphClient(session) } @Test def submitSuite(): Unit = { session = graphProvider.getGraphClient(userConfig) - assert(graphProvider.submit(session, "show hosts").isSucceeded) + assert(graphProvider.submit(session, "show hosts")._2.isSucceeded) graphProvider.releaseGraphClient(session) } diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 76be8920..6aad4090 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -315,15 +315,17 @@ object Exchange { } } spark.close() + val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble LOG.info( - s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0) - .formatted("%.2f")}s \n" + + s"\n>>>>>> exchange job finished, cost ${duration}s \n" + s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" + s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" + s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" + s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" + s">>>>>> total SST failure:${totalSstRecordFailure} \n" + s">>>>>> total SST Success:${totalSstRecordSuccess}") + LOG.info( + s">>>>>> exchange import qps: ${(totalClientRecordSuccess / duration).formatted("%.2f")}/s") } /** diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 88311618..3f065280 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -313,15 +313,16 @@ object Exchange { } } spark.close() + val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble LOG.info( - s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0) - .formatted("%.2f")}s \n" + + s"\n>>>>>> exchange job finished, cost ${duration}s \n" + s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" + s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" + s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" + s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" + s">>>>>> total SST failure:${totalSstRecordFailure} \n" + s">>>>>> total SST Success:${totalSstRecordSuccess}") + LOG.info(s">>>>>> exchange import qps: ${(totalClientRecordSuccess/duration).formatted("%.2f")}/s") } /** diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 12a41f5d..8729deb6 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -312,15 +312,17 @@ object Exchange { } } spark.close() + val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble LOG.info( - s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0) - .formatted("%.2f")}s \n" + + s"\n>>>>>> exchange job finished, cost ${duration}s \n" + s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" + s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" + s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" + s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" + s">>>>>> total SST failure:${totalSstRecordFailure} \n" + s">>>>>> total SST Success:${totalSstRecordSuccess}") + LOG.info( + s">>>>>> exchange import qps: ${(totalClientRecordSuccess / duration).formatted("%.2f")}/s") } /**