Skip to content

Commit

Permalink
add graph address for latency info (#175)
Browse files Browse the repository at this point in the history
* add graph address for latency info

* add qps log
  • Loading branch information
Nicole00 authored Nov 16, 2023
1 parent a87e998 commit c094a21
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class GraphProvider(addresses: List[HostAddress], timeout: Int, sslConfigEntry:

@transient val nebulaPoolConfig = new NebulaPoolConfig
@transient val pool: NebulaPool = new NebulaPool
val randAddr = scala.util.Random.shuffle(addresses)
val randAddr = scala.util.Random.shuffle(addresses)

nebulaPoolConfig.setTimeout(timeout)

Expand Down Expand Up @@ -63,14 +63,15 @@ class GraphProvider(addresses: List[HostAddress], timeout: Int, sslConfigEntry:
pool.close()
}

def switchSpace(session: Session, space: String): ResultSet = {
def switchSpace(session: Session, space: String): (HostAddress, ResultSet) = {
val switchStatment = s"use $space"
LOG.info(s">>>>>> switch space $space")
val result = submit(session, switchStatment)
result
}

def submit(session: Session, statement: String): ResultSet = {
session.execute(statement)
def submit(session: Session, statement: String): (HostAddress, ResultSet) = {
val result = session.execute(statement)
(session.getGraphHost,result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,16 @@ import java.util.concurrent.TimeUnit
import com.google.common.util.concurrent.RateLimiter
import com.vesoft.exchange.common.GraphProvider
import com.vesoft.exchange.common.{Edges, KeyPolicy, Vertices}
import com.vesoft.exchange.common.config.{DataBaseConfigEntry, EdgeConfigEntry, RateConfigEntry, SchemaConfigEntry, TagConfigEntry, Type, UserConfigEntry, WriteMode}
import com.vesoft.exchange.common.config.{
DataBaseConfigEntry,
EdgeConfigEntry,
RateConfigEntry,
SchemaConfigEntry,
TagConfigEntry,
Type,
UserConfigEntry,
WriteMode
}
import com.vesoft.nebula.ErrorCode
import org.apache.log4j.Logger

Expand All @@ -36,8 +45,8 @@ abstract class ServerBaseWriter extends Writer {
private[this] val UPDATE_VALUE_TEMPLATE = "`%s`=%s"

/**
* construct insert statement for vertex
*/
* construct insert statement for vertex
*/
def toExecuteSentence(name: String, vertices: Vertices, ignoreIndex: Boolean): String = {
{ if (ignoreIndex) BATCH_INSERT_IGNORE_INDEX_TEMPLATE else BATCH_INSERT_TEMPLATE }
.format(
Expand Down Expand Up @@ -67,8 +76,8 @@ abstract class ServerBaseWriter extends Writer {
}

/**
* construct delete statement for vertex
*/
* construct delete statement for vertex
*/
def toDeleteExecuteSentence(vertices: Vertices, deleteEdge: Boolean): String = {
{ if (deleteEdge) BATCH_DELETE_VERTEX_WITH_EDGE_TEMPLATE else BATCH_DELETE_VERTEX_TEMPLATE }
.format(
Expand Down Expand Up @@ -96,8 +105,8 @@ abstract class ServerBaseWriter extends Writer {
}

/**
* construct update statement for vertex
*/
* construct update statement for vertex
*/
def toUpdateExecuteSentence(tagName: String, vertices: Vertices): String = {
vertices.values
.map { vertex =>
Expand Down Expand Up @@ -130,8 +139,8 @@ abstract class ServerBaseWriter extends Writer {
}

/**
* construct insert statement for edge
*/
* construct insert statement for edge
*/
def toExecuteSentence(name: String, edges: Edges, ignoreIndex: Boolean): String = {
val values = edges.values
.map { edge =>
Expand Down Expand Up @@ -175,8 +184,8 @@ abstract class ServerBaseWriter extends Writer {
}

/**
* construct delete statement for edge
*/
* construct delete statement for edge
*/
def toDeleteExecuteSentence(edgeName: String, edges: Edges): String = {
BATCH_DELETE_EDGE_TEMPLATE.format(
Type.EDGE.toString,
Expand Down Expand Up @@ -212,48 +221,48 @@ abstract class ServerBaseWriter extends Writer {
}

/**
* construct update statement for edge
*/
* construct update statement for edge
*/
def toUpdateExecuteSentence(edgeName: String, edges: Edges): String = {
edges.values
.map { edge =>
var index = 0
val rank = if (edge.ranking.isEmpty) { 0 } else { edge.ranking.get }
UPDATE_EDGE_TEMPLATE.format(
Type.EDGE.toString,
edgeName,
edges.sourcePolicy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.source)
case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, edge.source)
case None =>
edge.source
case _ =>
throw new IllegalArgumentException(
s"source policy ${edges.sourcePolicy.get} is not supported")
},
edges.targetPolicy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.destination)
case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.destination)
case None =>
edge.destination
case _ =>
throw new IllegalArgumentException(
s"target policy ${edges.targetPolicy.get} is not supported")
},
rank,
edge.values
.map { value =>
val updateValue =
UPDATE_VALUE_TEMPLATE.format(edges.names.get(index), value)
index += 1
updateValue
}
.mkString(",")
)
var index = 0
val rank = if (edge.ranking.isEmpty) { 0 } else { edge.ranking.get }
UPDATE_EDGE_TEMPLATE.format(
Type.EDGE.toString,
edgeName,
edges.sourcePolicy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.source)
case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, edge.source)
case None =>
edge.source
case _ =>
throw new IllegalArgumentException(
s"source policy ${edges.sourcePolicy.get} is not supported")
},
edges.targetPolicy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.destination)
case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, edge.destination)
case None =>
edge.destination
case _ =>
throw new IllegalArgumentException(
s"target policy ${edges.targetPolicy.get} is not supported")
},
rank,
edge.values
.map { value =>
val updateValue =
UPDATE_VALUE_TEMPLATE.format(edges.names.get(index), value)
index += 1
updateValue
}
.mkString(",")
)
}
.mkString(";")
}
Expand Down Expand Up @@ -287,9 +296,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,

def prepare(): Unit = {
val switchResult = graphProvider.switchSpace(session, dataBaseConfigEntry.space)
if (!switchResult.isSucceeded) {
if (!switchResult._2.isSucceeded) {
this.close()
throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage)
throw new RuntimeException("Switch Failed for " + switchResult._2.getErrorMessage)
}

LOG.info(s">>>>>> Connection to ${dataBaseConfigEntry.graphAddress}")
Expand Down Expand Up @@ -327,15 +336,16 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode)
if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) {
val result = graphProvider.submit(session, statement)
if (result.isSucceeded) {
if (result._2.isSucceeded) {
LOG.info(
s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})")
s">>>>> write ${config.name}, batch size(${vertices.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency})")
return null
}
LOG.error(s">>>>> write vertex failed for ${result.getErrorMessage} statement: \n $statement")
if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
LOG.error(
s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement")
if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}")
s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}")
}
} else {
LOG.error(s">>>>>> write vertex failed because write speed is too fast")
Expand All @@ -347,15 +357,15 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode)
if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) {
val result = graphProvider.submit(session, statement)
if (result.isSucceeded) {
if (result._2.isSucceeded) {
LOG.info(
s">>>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)")
s">>>>>> write ${config.name}, batch size(${edges.values.size}), graph(${result._1.toString}), latency(${result._2.getLatency}us)")
return null
}
LOG.error(s">>>>>> write edge failed for ${result.getErrorMessage}")
if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
LOG.error(s">>>>>> write edge failed for ${result._2.getErrorMessage}")
if (result._2.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}")
s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}")
}
} else {
LOG.error(s">>>>>> write vertex failed because write speed is too fast")
Expand All @@ -366,10 +376,10 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
override def writeNgql(ngql: String): String = {
if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) {
val result = graphProvider.submit(session, ngql)
if (result.isSucceeded) {
if (result._2.isSucceeded) {
return null
}
LOG.error(s">>>>>> reimport ngql failed for ${result.getErrorMessage}")
LOG.error(s">>>>>> reimport ngql failed for ${result._2.getErrorMessage}")
} else {
LOG.error(s">>>>>> reimport ngql failed because write speed is too fast")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ class GraphProviderSuite {
@Test
def switchSpaceSuite(): Unit = {
session = graphProvider.getGraphClient(userConfig)
assert(graphProvider.switchSpace(session, "test_string").isSucceeded)
assert(graphProvider.switchSpace(session, "test_int").isSucceeded)
assert(graphProvider.switchSpace(session, "test_string")._2.isSucceeded)
assert(graphProvider.switchSpace(session, "test_int")._2.isSucceeded)
graphProvider.releaseGraphClient(session)
}

@Test
def submitSuite(): Unit = {
session = graphProvider.getGraphClient(userConfig)
assert(graphProvider.submit(session, "show hosts").isSucceeded)
assert(graphProvider.submit(session, "show hosts")._2.isSucceeded)
graphProvider.releaseGraphClient(session)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,17 @@ object Exchange {
}
}
spark.close()
val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble
LOG.info(
s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0)
.formatted("%.2f")}s \n" +
s"\n>>>>>> exchange job finished, cost ${duration}s \n" +
s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" +
s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" +
s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" +
s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" +
s">>>>>> total SST failure:${totalSstRecordFailure} \n" +
s">>>>>> total SST Success:${totalSstRecordSuccess}")
LOG.info(
s">>>>>> exchange import qps: ${(totalClientRecordSuccess / duration).formatted("%.2f")}/s")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,16 @@ object Exchange {
}
}
spark.close()
val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble
LOG.info(
s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0)
.formatted("%.2f")}s \n" +
s"\n>>>>>> exchange job finished, cost ${duration}s \n" +
s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" +
s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" +
s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" +
s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" +
s">>>>>> total SST failure:${totalSstRecordFailure} \n" +
s">>>>>> total SST Success:${totalSstRecordSuccess}")
LOG.info(s">>>>>> exchange import qps: ${(totalClientRecordSuccess/duration).formatted("%.2f")}/s")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,17 @@ object Exchange {
}
}
spark.close()
val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble
LOG.info(
s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0)
.formatted("%.2f")}s \n" +
s"\n>>>>>> exchange job finished, cost ${duration}s \n" +
s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" +
s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" +
s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" +
s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" +
s">>>>>> total SST failure:${totalSstRecordFailure} \n" +
s">>>>>> total SST Success:${totalSstRecordSuccess}")
LOG.info(
s">>>>>> exchange import qps: ${(totalClientRecordSuccess / duration).formatted("%.2f")}/s")
}

/**
Expand Down

0 comments on commit c094a21

Please sign in to comment.