diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 0f4557d2..95156690 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -243,7 +243,7 @@ object Configs { private[this] val DEFAULT_ENABLE_SSL = false private[this] val DEFAULT_SSL_SIGN_TYPE = "CA" private[this] val DEFAULT_EDGE_RANKING = 0L - private[this] val DEFAULT_BATCH = 2 + private[this] val DEFAULT_BATCH = 500 private[this] val DEFAULT_PARTITION = -1 private[this] val DEFAULT_CHECK_POINT_PATH = None private[this] val DEFAULT_LOCAL_PATH = None diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala index 19fe4b4c..fbed3074 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala @@ -133,13 +133,14 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, sentence) if (result.isSucceeded) { + LOG.info( + s" write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})") return null } LOG.error(s"write vertex failed for ${result.getErrorMessage}") } else { LOG.error(s"write vertex failed because write speed is too fast") } - LOG.info(sentence) sentence } @@ -148,13 +149,14 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, sentence) if (result.isSucceeded) { + LOG.info( + s" write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)") return null } LOG.error(s"write edge failed for ${result.getErrorMessage}") } else { LOG.error(s"write vertex failed because write speed is too fast") } - LOG.info(sentence) sentence } @@ -168,7 +170,6 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, } else { LOG.error(s"reimport ngql failed because write speed is too fast") } - LOG.info(ngql) ngql } diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 79e9d1e8..67b2a646 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -142,6 +142,8 @@ object Exchange { val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { + data.get.cache() + val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -159,8 +161,10 @@ object Exchange { batchFailure ) processor.process() + data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name} cost time: ${costTime} s") + LOG.info( + s"import for tag ${tagConfig.name}: data total count: $count, total time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -191,6 +195,8 @@ object Exchange { } val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { + data.get.cache() + val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -206,8 +212,10 @@ object Exchange { batchFailure ) processor.process() + data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for edge ${edgeConfig.name} cost time: ${costTime} s") + LOG.info( + s"import for edge ${edgeConfig.name}: data total count: $count, total time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 9b4e5ef2..74bf70de 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -142,6 +142,8 @@ object Exchange { val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { + data.get.cache() + val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -159,8 +161,9 @@ object Exchange { batchFailure ) processor.process() + data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name} cost time: ${costTime} s") + LOG.info(s"import for tag ${tagConfig.name}, data count: $count, cost time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -191,6 +194,8 @@ object Exchange { } val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { + data.get.cache() + val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -206,8 +211,10 @@ object Exchange { batchFailure ) processor.process() + data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for edge ${edgeConfig.name} cost time: ${costTime} s") + LOG.info( + s"import for edge ${edgeConfig.name}, data count: $count, cost time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 95019c83..07ae6559 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -142,6 +142,8 @@ object Exchange { val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { + data.get.cache() + val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -159,8 +161,9 @@ object Exchange { batchFailure ) processor.process() + data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name} cost time: ${costTime} s") + LOG.info(s"import for tag ${tagConfig.name}, data count: $count, cost time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -191,6 +194,8 @@ object Exchange { } val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { + data.get.cache() + val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -206,8 +211,10 @@ object Exchange { batchFailure ) processor.process() + data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for edge ${edgeConfig.name} cost time: ${costTime} s") + LOG.info( + s"import for edge ${edgeConfig.name}, data count: $count, cost time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}")