diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala index 40eda062..2b0f0f8b 100644 --- a/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala +++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala @@ -36,7 +36,7 @@ import scala.collection.mutable.ListBuffer * --conf spark.executor.extraClassPath=./ \ * --jars commons-cli-1.4.jar \ * --class com.vesoft.nebula.examples.connector.Nebula2Nebula example-3.0-SNAPSHOT-jar-with-dependencies.jar \ - * -sourceMeta "192.168.8.171:9559" -sourceSpace "source" -limit 2 -targetMeta "192.168.8.171:9559" -targetGraph "192.168.8.171:9669" -targetSpace "target" -batch 2 + * -sourceMeta "192.168.8.171:9559" -sourceSpace "source" -limit 2 -targetMeta "192.168.8.171:9559" -targetGraph "192.168.8.171:9669" -targetSpace "target" -batch 2 -timeout 50000 -u root -passwd nebula * */ object Nebula2Nebula { @@ -55,18 +55,38 @@ object Nebula2Nebula { val sourceMetaOption = new Option("sourceMeta", "sourceMetaAddress", true, "source nebulagraph metad address") + sourceMetaOption.setRequired(true) val sourceSpaceOption = new Option("sourceSpace", "sourceSpace", true, "source nebulagraph space name") + sourceSpaceOption.setRequired(true) val limitOption = new Option("limit", "limit", true, "records for one reading request for reading") + limitOption.setRequired(true) val targetMetaOption = new Option("targetMeta", "targetMetaAddress", true, "target nebulagraph metad address") + targetMetaOption.setRequired(true) val targetGraphOption = new Option("targetGraph", "targetGraphAddress", true, "target nebulagraph graphd address") + targetGraphOption.setRequired(true) val targetSpaceOption = new Option("targetSpace", "targetSpace", true, "target nebulagraph space name") - val batchOption = new Option("batch", "batch", true, "batch size for one insert request") + targetSpaceOption.setRequired(true) + val batchOption = new Option("batch", "batch", true, "batch size for one insert request") + batchOption.setRequired(true) val writeParallelOption = new Option("p", "parallel", true, "parallel for writing data") + writeParallelOption.setRequired(true) + val timeoutOption = new Option("timeout", "timeout", true, "timeout for java client"); + timeoutOption.setRequired(true) + val userOption = new Option("u", "user", true, "user") + userOption.setRequired(true) + val passwdOption = new Option("passwd", "password", true, "password") + passwdOption.setRequired(true) + + // filter out some tags /edges + val excludeTagsOption = + new Option("excludeTags", "excludeTags", true, "filter out these tags, separate with `,`") + val excludeEdgesOption = + new Option("excludeEdges", "excludeEdges", true, "filter out these edges, separate with `,`") val options = new Options options.addOption(sourceMetaOption) @@ -77,13 +97,19 @@ object Nebula2Nebula { options.addOption(targetSpaceOption) options.addOption(batchOption) options.addOption(writeParallelOption) + options.addOption(timeoutOption) + options.addOption(userOption) + options.addOption(passwdOption) + options.addOption(excludeTagsOption) + options.addOption(excludeEdgesOption) var cli: CommandLine = null val cliParser: CommandLineParser = new DefaultParser val helpFormatter = new HelpFormatter - try cli = cliParser.parse(options, args) - catch { + try { + cli = cliParser.parse(options, args) + } catch { case e: ParseException => helpFormatter.printHelp(">>>> options", options) e.printStackTrace() @@ -98,6 +124,15 @@ object Nebula2Nebula { val targetSpace: String = cli.getOptionValue("targetSpace") val batch: Int = cli.getOptionValue("batch").toInt val parallel: Int = cli.getOptionValue("p").toInt + val timeout: Int = cli.getOptionValue("timeout").toInt + val user: String = cli.getOptionValue("u") + val passed: String = cli.getOptionValue("passwd") + val excludeTags: List[String] = + if (cli.hasOption("excludeTags")) cli.getOptionValue("excludeTags").split(",").toList + else List() + val excludeEdges: List[String] = + if (cli.hasOption("excludeEdges")) cli.getOptionValue("excludeEdges").split(",").toList + else List() // common config val sourceConnectConfig = @@ -105,6 +140,7 @@ object Nebula2Nebula { .builder() .withMetaAddress(sourceMetaAddr) .withConenctionRetry(2) + .withTimeout(timeout) .build() val targetConnectConfig = @@ -112,13 +148,21 @@ object Nebula2Nebula { .builder() .withMetaAddress(targetMetaAddr) .withGraphAddress(targetGraphAddr) + .withTimeout(timeout) .withConenctionRetry(2) .build() val metaHostAndPort = sourceMetaAddr.split(":") - val (tags, edges, partitions) = + var (tags, edges, partitions) = getTagsAndEdges(metaHostAndPort(0), metaHostAndPort(1).toInt, sourceSpace) + if (excludeTags.nonEmpty) { + tags = tags.dropWhile(ele => excludeTags.contains(ele)) + } + if (excludeEdges.nonEmpty) { + edges = edges.dropWhile(ele => excludeEdges.contains(ele)) + } + tags.foreach(tag => { syncTag(spark, sourceConnectConfig, @@ -129,7 +173,9 @@ object Nebula2Nebula { targetSpace, batch, tag, - parallel) + parallel, + user, + passed) }) edges.foreach(edge => { @@ -142,9 +188,10 @@ object Nebula2Nebula { targetSpace, batch, edge, - parallel) + parallel, + user, + passed) }) - } def getTagsAndEdges(metaHost: String, @@ -176,7 +223,9 @@ object Nebula2Nebula { targetSpace: String, batch: Int, tag: String, - writeParallel: Int): Unit = { + writeParallel: Int, + user: String, + passwd: String): Unit = { val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace(sourceSpace) @@ -194,6 +243,8 @@ object Nebula2Nebula { val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig .builder() .withSpace(targetSpace) + .withUser(user) + .withPasswd(passwd) .withTag(tag) .withVidField("_vertexId") .withBatch(batch) @@ -211,7 +262,9 @@ object Nebula2Nebula { targetSpace: String, batch: Int, edge: String, - writeParallel: Int): Unit = { + writeParallel: Int, + user: String, + passwd: String): Unit = { val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace(sourceSpace) @@ -229,6 +282,8 @@ object Nebula2Nebula { val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig .builder() .withSpace(targetSpace) + .withUser(user) + .withPasswd(passwd) .withEdge(edge) .withSrcIdField("_srcId") .withDstIdField("_dstId")