From e8d5edfaedd0f7ed8d3083dc091dcfef048ca88a Mon Sep 17 00:00:00 2001 From: Anqi Date: Tue, 18 Apr 2023 11:31:43 +0800 Subject: [PATCH 1/2] support to exclude specific tags or edges when migrate space --- .../examples/connector/Nebula2Nebula.scala | 73 ++++++++++++++++--- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala index 40eda062..5499f67f 100644 --- a/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala +++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala @@ -36,7 +36,7 @@ import scala.collection.mutable.ListBuffer * --conf spark.executor.extraClassPath=./ \ * --jars commons-cli-1.4.jar \ * --class com.vesoft.nebula.examples.connector.Nebula2Nebula example-3.0-SNAPSHOT-jar-with-dependencies.jar \ - * -sourceMeta "192.168.8.171:9559" -sourceSpace "source" -limit 2 -targetMeta "192.168.8.171:9559" -targetGraph "192.168.8.171:9669" -targetSpace "target" -batch 2 + * -sourceMeta "192.168.8.171:9559" -sourceSpace "source" -limit 2 -targetMeta "192.168.8.171:9559" -targetGraph "192.168.8.171:9669" -targetSpace "target" -batch 2 -timeout 50000 -u root -passwd nebula * */ object Nebula2Nebula { @@ -55,18 +55,38 @@ object Nebula2Nebula { val sourceMetaOption = new Option("sourceMeta", "sourceMetaAddress", true, "source nebulagraph metad address") + sourceMetaOption.setRequired(true) val sourceSpaceOption = new Option("sourceSpace", "sourceSpace", true, "source nebulagraph space name") + sourceSpaceOption.setRequired(true) val limitOption = new Option("limit", "limit", true, "records for one reading request for reading") + limitOption.setRequired(true) val targetMetaOption = new Option("targetMeta", "targetMetaAddress", true, "target nebulagraph metad address") + targetMetaOption.setRequired(true) val targetGraphOption = new Option("targetGraph", "targetGraphAddress", true, "target nebulagraph graphd address") + targetGraphOption.setRequired(true) val targetSpaceOption = new Option("targetSpace", "targetSpace", true, "target nebulagraph space name") - val batchOption = new Option("batch", "batch", true, "batch size for one insert request") + targetSpaceOption.setRequired(true) + val batchOption = new Option("batch", "batch", true, "batch size for one insert request") + batchOption.setRequired(true) val writeParallelOption = new Option("p", "parallel", true, "parallel for writing data") + writeParallelOption.setRequired(true) + val timeoutOption = new Option("timeout", "timeout", true, "timeout for java client"); + timeoutOption.setRequired(true) + val userOption = new Option("u", "user", true, "user") + userOption.setRequired(true) + val passwdOption = new Option("passwd", "password", true, "password") + passwdOption.setRequired(true) + + // filter out some tags /edges + val missTagsOption = + new Option("misstags", "misstags", true, "filter out these tags, separate with `,`") + val missEdgesOption = + new Option("missedges", "missedges", true, "filter out these edges, separate with `,`") val options = new Options options.addOption(sourceMetaOption) @@ -77,13 +97,19 @@ object Nebula2Nebula { options.addOption(targetSpaceOption) options.addOption(batchOption) options.addOption(writeParallelOption) + options.addOption(timeoutOption) + options.addOption(userOption) + options.addOption(passwdOption) + options.addOption(missTagsOption) + options.addOption(missEdgesOption) var cli: CommandLine = null val cliParser: CommandLineParser = new DefaultParser val helpFormatter = new HelpFormatter - try cli = cliParser.parse(options, args) - catch { + try { + cli = cliParser.parse(options, args) + } catch { case e: ParseException => helpFormatter.printHelp(">>>> options", options) e.printStackTrace() @@ -98,6 +124,13 @@ object Nebula2Nebula { val targetSpace: String = cli.getOptionValue("targetSpace") val batch: Int = cli.getOptionValue("batch").toInt val parallel: Int = cli.getOptionValue("p").toInt + val timeout: Int = cli.getOptionValue("timeout").toInt + val user: String = cli.getOptionValue("u") + val passed: String = cli.getOptionValue("passwd") + val misstags: List[String] = + if (cli.hasOption("misstags")) cli.getOptionValue("misstags").split(",").toList else List() + val missedges: List[String] = + if (cli.hasOption("missedges")) cli.getOptionValue("missedges").split(",").toList else List() // common config val sourceConnectConfig = @@ -105,6 +138,7 @@ object Nebula2Nebula { .builder() .withMetaAddress(sourceMetaAddr) .withConenctionRetry(2) + .withTimeout(timeout) .build() val targetConnectConfig = @@ -112,13 +146,21 @@ object Nebula2Nebula { .builder() .withMetaAddress(targetMetaAddr) .withGraphAddress(targetGraphAddr) + .withTimeout(timeout) .withConenctionRetry(2) .build() val metaHostAndPort = sourceMetaAddr.split(":") - val (tags, edges, partitions) = + var (tags, edges, partitions) = getTagsAndEdges(metaHostAndPort(0), metaHostAndPort(1).toInt, sourceSpace) + if (misstags.nonEmpty) { + tags = tags.dropWhile(ele => misstags.contains(ele)) + } + if (missedges.nonEmpty) { + edges = edges.dropWhile(ele => missedges.contains(ele)) + } + tags.foreach(tag => { syncTag(spark, sourceConnectConfig, @@ -129,7 +171,9 @@ object Nebula2Nebula { targetSpace, batch, tag, - parallel) + parallel, + user, + passed) }) edges.foreach(edge => { @@ -142,9 +186,10 @@ object Nebula2Nebula { targetSpace, batch, edge, - parallel) + parallel, + user, + passed) }) - } def getTagsAndEdges(metaHost: String, @@ -176,7 +221,9 @@ object Nebula2Nebula { targetSpace: String, batch: Int, tag: String, - writeParallel: Int): Unit = { + writeParallel: Int, + user: String, + passwd: String): Unit = { val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace(sourceSpace) @@ -194,6 +241,8 @@ object Nebula2Nebula { val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig .builder() .withSpace(targetSpace) + .withUser(user) + .withPasswd(passwd) .withTag(tag) .withVidField("_vertexId") .withBatch(batch) @@ -211,7 +260,9 @@ object Nebula2Nebula { targetSpace: String, batch: Int, edge: String, - writeParallel: Int): Unit = { + writeParallel: Int, + user: String, + passwd: String): Unit = { val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace(sourceSpace) @@ -229,6 +280,8 @@ object Nebula2Nebula { val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig .builder() .withSpace(targetSpace) + .withUser(user) + .withPasswd(passwd) .withEdge(edge) .withSrcIdField("_srcId") .withDstIdField("_dstId") From 10be7b0c4da0613ae0ac6eb75b3c5f6714c81281 Mon Sep 17 00:00:00 2001 From: Anqi Date: Tue, 18 Apr 2023 11:56:34 +0800 Subject: [PATCH 2/2] update variable name --- .../examples/connector/Nebula2Nebula.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala index 5499f67f..2b0f0f8b 100644 --- a/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala +++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala @@ -83,10 +83,10 @@ object Nebula2Nebula { passwdOption.setRequired(true) // filter out some tags /edges - val missTagsOption = - new Option("misstags", "misstags", true, "filter out these tags, separate with `,`") - val missEdgesOption = - new Option("missedges", "missedges", true, "filter out these edges, separate with `,`") + val excludeTagsOption = + new Option("excludeTags", "excludeTags", true, "filter out these tags, separate with `,`") + val excludeEdgesOption = + new Option("excludeEdges", "excludeEdges", true, "filter out these edges, separate with `,`") val options = new Options options.addOption(sourceMetaOption) @@ -100,8 +100,8 @@ object Nebula2Nebula { options.addOption(timeoutOption) options.addOption(userOption) options.addOption(passwdOption) - options.addOption(missTagsOption) - options.addOption(missEdgesOption) + options.addOption(excludeTagsOption) + options.addOption(excludeEdgesOption) var cli: CommandLine = null val cliParser: CommandLineParser = new DefaultParser @@ -127,10 +127,12 @@ object Nebula2Nebula { val timeout: Int = cli.getOptionValue("timeout").toInt val user: String = cli.getOptionValue("u") val passed: String = cli.getOptionValue("passwd") - val misstags: List[String] = - if (cli.hasOption("misstags")) cli.getOptionValue("misstags").split(",").toList else List() - val missedges: List[String] = - if (cli.hasOption("missedges")) cli.getOptionValue("missedges").split(",").toList else List() + val excludeTags: List[String] = + if (cli.hasOption("excludeTags")) cli.getOptionValue("excludeTags").split(",").toList + else List() + val excludeEdges: List[String] = + if (cli.hasOption("excludeEdges")) cli.getOptionValue("excludeEdges").split(",").toList + else List() // common config val sourceConnectConfig = @@ -154,11 +156,11 @@ object Nebula2Nebula { var (tags, edges, partitions) = getTagsAndEdges(metaHostAndPort(0), metaHostAndPort(1).toInt, sourceSpace) - if (misstags.nonEmpty) { - tags = tags.dropWhile(ele => misstags.contains(ele)) + if (excludeTags.nonEmpty) { + tags = tags.dropWhile(ele => excludeTags.contains(ele)) } - if (missedges.nonEmpty) { - edges = edges.dropWhile(ele => missedges.contains(ele)) + if (excludeEdges.nonEmpty) { + edges = edges.dropWhile(ele => excludeEdges.contains(ele)) } tags.foreach(tag => {