Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support to exclude specific tags or edges when migrate space #93

Merged
merged 2 commits into from
Apr 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import scala.collection.mutable.ListBuffer
* --conf spark.executor.extraClassPath=./ \
* --jars commons-cli-1.4.jar \
* --class com.vesoft.nebula.examples.connector.Nebula2Nebula example-3.0-SNAPSHOT-jar-with-dependencies.jar \
* -sourceMeta "192.168.8.171:9559" -sourceSpace "source" -limit 2 -targetMeta "192.168.8.171:9559" -targetGraph "192.168.8.171:9669" -targetSpace "target" -batch 2
* -sourceMeta "192.168.8.171:9559" -sourceSpace "source" -limit 2 -targetMeta "192.168.8.171:9559" -targetGraph "192.168.8.171:9669" -targetSpace "target" -batch 2 -timeout 50000 -u root -passwd nebula
*
*/
object Nebula2Nebula {
Expand All @@ -55,18 +55,38 @@ object Nebula2Nebula {

val sourceMetaOption =
new Option("sourceMeta", "sourceMetaAddress", true, "source nebulagraph metad address")
sourceMetaOption.setRequired(true)
val sourceSpaceOption =
new Option("sourceSpace", "sourceSpace", true, "source nebulagraph space name")
sourceSpaceOption.setRequired(true)
val limitOption =
new Option("limit", "limit", true, "records for one reading request for reading")
limitOption.setRequired(true)
val targetMetaOption =
new Option("targetMeta", "targetMetaAddress", true, "target nebulagraph metad address")
targetMetaOption.setRequired(true)
val targetGraphOption =
new Option("targetGraph", "targetGraphAddress", true, "target nebulagraph graphd address")
targetGraphOption.setRequired(true)
val targetSpaceOption =
new Option("targetSpace", "targetSpace", true, "target nebulagraph space name")
val batchOption = new Option("batch", "batch", true, "batch size for one insert request")
targetSpaceOption.setRequired(true)
val batchOption = new Option("batch", "batch", true, "batch size for one insert request")
batchOption.setRequired(true)
val writeParallelOption = new Option("p", "parallel", true, "parallel for writing data")
writeParallelOption.setRequired(true)
val timeoutOption = new Option("timeout", "timeout", true, "timeout for java client");
timeoutOption.setRequired(true)
val userOption = new Option("u", "user", true, "user")
userOption.setRequired(true)
val passwdOption = new Option("passwd", "password", true, "password")
passwdOption.setRequired(true)

// filter out some tags /edges
val excludeTagsOption =
new Option("excludeTags", "excludeTags", true, "filter out these tags, separate with `,`")
val excludeEdgesOption =
new Option("excludeEdges", "excludeEdges", true, "filter out these edges, separate with `,`")

val options = new Options
options.addOption(sourceMetaOption)
Expand All @@ -77,13 +97,19 @@ object Nebula2Nebula {
options.addOption(targetSpaceOption)
options.addOption(batchOption)
options.addOption(writeParallelOption)
options.addOption(timeoutOption)
options.addOption(userOption)
options.addOption(passwdOption)
options.addOption(excludeTagsOption)
options.addOption(excludeEdgesOption)

var cli: CommandLine = null
val cliParser: CommandLineParser = new DefaultParser
val helpFormatter = new HelpFormatter

try cli = cliParser.parse(options, args)
catch {
try {
cli = cliParser.parse(options, args)
} catch {
case e: ParseException =>
helpFormatter.printHelp(">>>> options", options)
e.printStackTrace()
Expand All @@ -98,27 +124,45 @@ object Nebula2Nebula {
val targetSpace: String = cli.getOptionValue("targetSpace")
val batch: Int = cli.getOptionValue("batch").toInt
val parallel: Int = cli.getOptionValue("p").toInt
val timeout: Int = cli.getOptionValue("timeout").toInt
val user: String = cli.getOptionValue("u")
val passed: String = cli.getOptionValue("passwd")
val excludeTags: List[String] =
if (cli.hasOption("excludeTags")) cli.getOptionValue("excludeTags").split(",").toList
else List()
val excludeEdges: List[String] =
if (cli.hasOption("excludeEdges")) cli.getOptionValue("excludeEdges").split(",").toList
else List()

// common config
val sourceConnectConfig =
NebulaConnectionConfig
.builder()
.withMetaAddress(sourceMetaAddr)
.withConenctionRetry(2)
.withTimeout(timeout)
.build()

val targetConnectConfig =
NebulaConnectionConfig
.builder()
.withMetaAddress(targetMetaAddr)
.withGraphAddress(targetGraphAddr)
.withTimeout(timeout)
.withConenctionRetry(2)
.build()

val metaHostAndPort = sourceMetaAddr.split(":")
val (tags, edges, partitions) =
var (tags, edges, partitions) =
getTagsAndEdges(metaHostAndPort(0), metaHostAndPort(1).toInt, sourceSpace)

if (excludeTags.nonEmpty) {
tags = tags.dropWhile(ele => excludeTags.contains(ele))
}
if (excludeEdges.nonEmpty) {
edges = edges.dropWhile(ele => excludeEdges.contains(ele))
}

tags.foreach(tag => {
syncTag(spark,
sourceConnectConfig,
Expand All @@ -129,7 +173,9 @@ object Nebula2Nebula {
targetSpace,
batch,
tag,
parallel)
parallel,
user,
passed)
})

edges.foreach(edge => {
Expand All @@ -142,9 +188,10 @@ object Nebula2Nebula {
targetSpace,
batch,
edge,
parallel)
parallel,
user,
passed)
})

}

def getTagsAndEdges(metaHost: String,
Expand Down Expand Up @@ -176,7 +223,9 @@ object Nebula2Nebula {
targetSpace: String,
batch: Int,
tag: String,
writeParallel: Int): Unit = {
writeParallel: Int,
user: String,
passwd: String): Unit = {
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace(sourceSpace)
Expand All @@ -194,6 +243,8 @@ object Nebula2Nebula {
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace(targetSpace)
.withUser(user)
.withPasswd(passwd)
.withTag(tag)
.withVidField("_vertexId")
.withBatch(batch)
Expand All @@ -211,7 +262,9 @@ object Nebula2Nebula {
targetSpace: String,
batch: Int,
edge: String,
writeParallel: Int): Unit = {
writeParallel: Int,
user: String,
passwd: String): Unit = {
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace(sourceSpace)
Expand All @@ -229,6 +282,8 @@ object Nebula2Nebula {
val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace(targetSpace)
.withUser(user)
.withPasswd(passwd)
.withEdge(edge)
.withSrcIdField("_srcId")
.withDstIdField("_dstId")
Expand Down