From e57651a99e815cbb4cd4df114b88e58a01fc3dcf Mon Sep 17 00:00:00 2001 From: Anqi Date: Mon, 22 Jan 2024 09:56:41 +0800 Subject: [PATCH] support to config the encrypted password (#192) --- README-CN.md | 23 + README.md | 24 +- .../exchange/common/PasswordEncryption.scala | 94 +++ .../exchange/common/config/Configs.scala | 11 +- .../application_encrypt_password.conf | 679 ++++++++++++++++++ 5 files changed, 828 insertions(+), 3 deletions(-) create mode 100644 exchange-common/src/main/scala/com/vesoft/exchange/common/PasswordEncryption.scala create mode 100644 nebula-exchange_spark_2.4/src/main/resources/application_encrypt_password.conf diff --git a/README-CN.md b/README-CN.md index 4076c913..8fc0c6c9 100644 --- a/README-CN.md +++ b/README-CN.md @@ -59,6 +59,28 @@ java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.G {target-path-to-save-config-file} ``` +## 加密 NebulaGraph 密码 +```agsl +spark-submit --master local --class com.vesoft.exchange.common.PasswordEncryption nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -p {password} +``` +加密 密码 nebula,输出结果包括RSA 公钥、私钥和加密后的password,示例: +```agsl +=================== public key begin =================== +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCLl7LaNSEXlZo2hYiJqzxgyFBQdkxbQXYU/xQthsBJwjOPhkiY37nokzKnjNlp6mv5ZUomqxLsoNQHEJ6BZD4VPiaiElFAkTD+gyul1v8f3A446Fr2rnVLogWHnz8ECPt7X8jwmpiKOXkOPIhqU5E0Cua+Kk0nnVosbos/VShfiQIDAQAB +=================== public key end =================== + + +=================== private key begin =================== +MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBAIuXsto1IReVmjaFiImrPGDIUFB2TFtBdhT/FC2GwEnCM4+GSJjfueiTMqeM2Wnqa/llSiarEuyg1AcQnoFkPhU+JqISUUCRMP6DK6XW/x/cDjjoWvaudUuiBYefPwQI+3tfyPCamIo5eQ48iGpTkTQK5r4qTSedWixuiz9VKF+JAgMBAAECgYADWbfEPwQ1UbTq3Bej3kVLuWMcG0rH4fFYnaq5UQOqgYvFRR7W9H+80lOj6+CIB0ViLgkylmaU4WNVbBOx3VsUFFWSqIIIviKubg8m8ey7KAd9X2wMEcUHi4JyS2+/WSacaXYS5LOmMevvuaOwLEV0QmyM+nNGRIjUdzCLR1935QJBAM+IF8YD5GnoAPPjGIDS1Ljhu/u/Gj6/YBCQKSHQ5+HxHEKjQ/YxQZ/otchmMZanYelf1y+byuJX3NZ04/KSGT8CQQCsMaoFO2rF5M84HpAXPi6yH2chbtz0VTKZworwUnpmMVbNUojf4VwzAyOhT1U5o0PpFbpi+NqQhC63VUN5k003AkEArI8vnVGNMlZbvG7e5/bmM9hWs2viSbxdB0inOtv2g1M1OV+B2gp405ru0/PNVcRV0HQFfCuhVfTSxmspQoAihwJBAJW6EZa/FZbB4JVxreUoAr6Lo8dkeOhT9M3SZbGWZivaFxot/Cp/8QXCYwbuzrJxjqlsZUeOD6694Uk08JkURn0CQQC8V6aRa8ylMhLJFkGkMDHLqHcQCmY53Kd73mUu4+mjMJLZh14zQD9ydFtc0lbLXTeBAMWV3uEdeLhRvdAo3OwV +=================== private key end =================== + + +=================== encrypted password begin =================== +Io+3y3mLOMnZJJNUPHZ8pKb4VfTvg6wUh6jSu5xdmLAoX/59tK1HTwoN40aOOWJwa1a5io7S4JqcX/jEcAorw7pelITr+F4oB0AMCt71d+gJuu3/lw9bjUEl9tF4Raj82y2Dg39wYbagN84fZMgCD63TPiDIevSr6+MFKASpGrY= +=================== encrypted password end =================== +check: the real password decrypted by private key and encrypted password is: nebula +``` + ## 版本匹配 Exchange 和 NebulaGraph 的版本对应关系如下: @@ -131,6 +153,7 @@ $SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange \ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \ -c application.conf ``` +*8. 自 3.7.0 版本,Exchange支持配置RSA加密后的NebulaGraph密码,并支持生成加密的密码。* 关于 Nebula Exchange 的更多说明,请参考 Exchange 2.0 的[使用手册](https://docs.nebula-graph.com.cn/2.6.2/nebula-exchange/about-exchange/ex-ug-what-is-exchange/) 。 diff --git a/README.md b/README.md index d3100e00..250b211e 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,28 @@ Such as your datasource is csv, and want to save the template config file in /tm java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s csv -p /tmp ``` +## encrypt NebulaGraph's password +```agsl +spark-submit --master local --class com.vesoft.exchange.common.PasswordEncryption nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -p {password} +``` +When encrypt the password `nebula`, the output includes RSA public key, private key, encrypted password: +```agsl +=================== public key begin =================== +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCLl7LaNSEXlZo2hYiJqzxgyFBQdkxbQXYU/xQthsBJwjOPhkiY37nokzKnjNlp6mv5ZUomqxLsoNQHEJ6BZD4VPiaiElFAkTD+gyul1v8f3A446Fr2rnVLogWHnz8ECPt7X8jwmpiKOXkOPIhqU5E0Cua+Kk0nnVosbos/VShfiQIDAQAB +=================== public key end =================== + + +=================== private key begin =================== +MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBAIuXsto1IReVmjaFiImrPGDIUFB2TFtBdhT/FC2GwEnCM4+GSJjfueiTMqeM2Wnqa/llSiarEuyg1AcQnoFkPhU+JqISUUCRMP6DK6XW/x/cDjjoWvaudUuiBYefPwQI+3tfyPCamIo5eQ48iGpTkTQK5r4qTSedWixuiz9VKF+JAgMBAAECgYADWbfEPwQ1UbTq3Bej3kVLuWMcG0rH4fFYnaq5UQOqgYvFRR7W9H+80lOj6+CIB0ViLgkylmaU4WNVbBOx3VsUFFWSqIIIviKubg8m8ey7KAd9X2wMEcUHi4JyS2+/WSacaXYS5LOmMevvuaOwLEV0QmyM+nNGRIjUdzCLR1935QJBAM+IF8YD5GnoAPPjGIDS1Ljhu/u/Gj6/YBCQKSHQ5+HxHEKjQ/YxQZ/otchmMZanYelf1y+byuJX3NZ04/KSGT8CQQCsMaoFO2rF5M84HpAXPi6yH2chbtz0VTKZworwUnpmMVbNUojf4VwzAyOhT1U5o0PpFbpi+NqQhC63VUN5k003AkEArI8vnVGNMlZbvG7e5/bmM9hWs2viSbxdB0inOtv2g1M1OV+B2gp405ru0/PNVcRV0HQFfCuhVfTSxmspQoAihwJBAJW6EZa/FZbB4JVxreUoAr6Lo8dkeOhT9M3SZbGWZivaFxot/Cp/8QXCYwbuzrJxjqlsZUeOD6694Uk08JkURn0CQQC8V6aRa8ylMhLJFkGkMDHLqHcQCmY53Kd73mUu4+mjMJLZh14zQD9ydFtc0lbLXTeBAMWV3uEdeLhRvdAo3OwV +=================== private key end =================== + + +=================== encrypted password begin =================== +Io+3y3mLOMnZJJNUPHZ8pKb4VfTvg6wUh6jSu5xdmLAoX/59tK1HTwoN40aOOWJwa1a5io7S4JqcX/jEcAorw7pelITr+F4oB0AMCt71d+gJuu3/lw9bjUEl9tF4Raj82y2Dg39wYbagN84fZMgCD63TPiDIevSr6+MFKASpGrY= +=================== encrypted password end =================== +check: the real password decrypted by private key and encrypted password is: nebula +``` + ## Version Compatibility Matrix Here is the version correspondence between Exchange and NebulaGraph: @@ -137,7 +159,7 @@ Here is the version correspondence between Exchange and NebulaGraph: 5. *Since 2.5* While SST import is supported by Exchange, property default values are not yet supported. 6. *Since 3.0* Exchange is compatible with Spark 2.2, Spark 2.4, and Spark 3.0. - +7. *Since 3.7* Exchange supports to config the encrypted NebulaGraph password and supports to generate the encryption password. Refer to [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf) as an example to edit the configuration file. diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/PasswordEncryption.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/PasswordEncryption.scala new file mode 100644 index 00000000..3f263197 --- /dev/null +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/PasswordEncryption.scala @@ -0,0 +1,94 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.exchange.common + +import com.vesoft.exchange.Argument + +import java.security.spec.{PKCS8EncodedKeySpec, X509EncodedKeySpec} +import java.security.{KeyFactory, KeyPairGenerator, SecureRandom} +import java.util.Base64 +import javax.crypto.Cipher +import javax.crypto.spec.SecretKeySpec + +object PasswordEncryption { + private val algorithm = "RSA" + private val charset = "UTF-8" + + def main(args: Array[String]): Unit = { + val passwdOption = new scopt.OptionParser[PasswordConfig]("encrypt password") { + head("encrypt password") + + opt[String]('p', "passwd") + .required() + .valueName("passwd") + .action((x, c) => c.copy(password = x)) + .text("your real password") + }.parse(args, PasswordConfig()) + + require(passwdOption.isDefined && passwdOption.get.password != null, "lack of password parameter") + + val password:String = passwdOption.get.password + + val (encryptedPasswd, privateKey) = encryptPassword(password) + println(s"=================== private key begin ===================") + println(privateKey) + println(s"=================== private key end ===================\n\n") + + println(s"=================== encrypted password begin ===================") + println(encryptedPasswd) + println(s"=================== encrypted password end ===================") + + println(s"check: the real password decrypted by private key and encrypted password is: ${decryptPassword(encryptedPasswd, privateKey)}") + } + + /** + * encrypt the password + * + * @param password real password + * @return (encryptedPasswd, privateKey) + */ + def encryptPassword(password: String): (String, String) = { + val keyPairGenerator = KeyPairGenerator.getInstance(algorithm) + keyPairGenerator.initialize(1024, new SecureRandom()) + val keyPair = keyPairGenerator.generateKeyPair() + val privateKey = keyPair.getPrivate + val privateKeyStr = new String(Base64.getEncoder.encode(privateKey.getEncoded), charset) + val publicKey = keyPair.getPublic + val publicKeyStr = new String(Base64.getEncoder.encode(publicKey.getEncoded), charset) + println(s"=================== public key begin ===================") + println(publicKeyStr) + println(s"=================== public key end ===================\n\n") + + // encrypt the password + val encoded = Base64.getDecoder.decode(publicKeyStr) + val rsaPublicKey = KeyFactory.getInstance(algorithm).generatePublic(new X509EncodedKeySpec(encoded)) + val cipher = Cipher.getInstance(algorithm) + cipher.init(Cipher.ENCRYPT_MODE, rsaPublicKey) + val encodePasswd = new String(Base64.getEncoder.encode(cipher.doFinal(password.getBytes(charset))), charset) + (encodePasswd, privateKeyStr) + } + + /** + * decrypt the encrypted password with private key + * + * @param encryptedPassword encrypted password + * @param privateKey rsa private key + * @return real password + */ + def decryptPassword(encryptedPassword: String, privateKey: String): String = { + val encryptedPasswdBytes = Base64.getDecoder.decode(encryptedPassword) + val decodedPrivateKey = Base64.getDecoder.decode(privateKey) + val rsaPrivateKey = KeyFactory.getInstance(algorithm).generatePrivate(new PKCS8EncodedKeySpec(decodedPrivateKey)) + val cipher = Cipher.getInstance(algorithm) + cipher.init(Cipher.DECRYPT_MODE, rsaPrivateKey) + val password = new String(cipher.doFinal(encryptedPasswdBytes), charset) + password + } + + +} + +final case class PasswordConfig(password: String = null) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 7819c452..5552c1c3 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -10,7 +10,7 @@ import java.nio.file.Files import com.google.common.net.HostAndPort import com.typesafe.config.{Config, ConfigFactory} import com.vesoft.exchange.Argument -import com.vesoft.exchange.common.KeyPolicy +import com.vesoft.exchange.common.{KeyPolicy, PasswordEncryption} import com.vesoft.exchange.common.utils.NebulaUtils import com.vesoft.nebula.client.graph.data.HostAddress import org.apache.hadoop.conf.Configuration @@ -318,7 +318,14 @@ object Configs { val user = nebulaConfig.getString("user") val pswd = nebulaConfig.getString("pswd") - val userEntry = UserConfigEntry(user, pswd) + val enableRSA = getOrElse(nebulaConfig, "enableRSA", false) + val privateKey = getStringOrNull(nebulaConfig, "privateKey") + require(!enableRSA || privateKey != null, "enableRSA is true, privateKey cannot be empty.") + var userEntry = UserConfigEntry(user, pswd) + if (enableRSA) { + userEntry = UserConfigEntry(user, PasswordEncryption.decryptPassword(pswd, privateKey)) + } + LOG.info(s"User Config ${userEntry}") val connectionConfig = getConfigOrNone(nebulaConfig, "connection") diff --git a/nebula-exchange_spark_2.4/src/main/resources/application_encrypt_password.conf b/nebula-exchange_spark_2.4/src/main/resources/application_encrypt_password.conf new file mode 100644 index 00000000..bc1ba031 --- /dev/null +++ b/nebula-exchange_spark_2.4/src/main/resources/application_encrypt_password.conf @@ -0,0 +1,679 @@ +{ + # Spark config + spark: { + app: { + name: Nebula Exchange + } + } + + # if the hive is hive-on-spark with derby mode, you can ignore this hive configure + # get the com.vesoft.exchange.common.config values from file $HIVE_HOME/conf/hive-site.xml or hive-default.xml + + # hive: { + # warehouse: "hdfs://NAMENODE_IP:9000/apps/svr/hive-xxx/warehouse/" + # connectionURL: "jdbc:mysql://your_ip:3306/hive_spark?characterEncoding=UTF-8" + # connectionDriverName: "com.mysql.jdbc.Driver" + # connectionUserName: "user" + # connectionPassword: "password" + # } + + + # Nebula Graph config + nebula: { + address:{ + graph:["127.0.0.1:9669"] + meta:["127.0.0.1:9559"] + } + user: root + # config the encrypted password. + pswd: "U+ARk2DmbbrOKXGltsz4X/OjTYQhmz9UfBrekxczUoSd4kjGntqbcZ2ccJAABcj3KfIMRTFlNyJCzhuKGTMnDVDSWZQ22TN7jdIXMnD+tJWmGcw0hKdnocG2pfao7MSU9m0g9kxBwqMThXhTY1IY4RMWEvumcAu4RD0x3KfW6D4=" + enableRSA: true + privateKey:"MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBALWQEbmzb3ZzDboUktSe2ZEwfa+nQ4tUMAMVN8miB6ZMH+FJdIeySx3e7sa3vnxCmnXoXpQ8Uw7K+1aXXAaAjXSOxv+jroNi/4cxv1Gd+REM1hNXt7RQ9s4NDkNIeMrY9sDxC/+wo30SWirD6DZeOnQKkjYSfp65A77eUBnbTY8NAgMBAAECgYAsT/QziXqrwxrXjovjfr2E9PvH8h/HEo4BQp1yBayuRqgFOQNK1xYEPVOzhkbkVvER09u5fBudYacEFD7ui1N6dLtwkwU7gYLUNs/tocGs54cvqZ5qlJKyM9S+lVzzuTBQBteYBv9TJC7Aheax622/oh6S81ya5OQUoME1S0cSgQJBAOK+xIb/qXNM5Prrf8LPpij+6djs8FbMRQzMph2sFHCbunvgNCuyBdITMzabcRSP/NT2X6SwAxpPlX7Sk8k/HaECQQDM/PVe8mJNk+iP2E9fU4AHg6EB+9mFJi916PCJF7HP2/113YhaVRAPCBJyJuPCYYONJxytCK+aLLwoUt6eUYHtAkEAtA7/q6CcpHYco/GdXEtfTNDZTK0jUhoWf0qIY6nB9YaE+VgatdRB8QfUNHoNu2F4Snl3jCVF4X/vXG3GdBqN4QJAVMEuTsCXBo5mNLtyLe7fW7v/4UsZZ+Ue8HiMo5JJcbD6vjYHf5n2FtD2l34e46rf8oe+YojZ2jwu29lPtFPtDQJAMOXvHArtXaVqxW5hfHfa81RTwc+UeQHnMTNv9tV0kptE3JacRt/EJdvmdSRscDdcr9vGiuwHjFZAZcpVgqBNew==" + space: test + # the version decided by NebulaGraph server. + version: 3.0.0 + + # if com.vesoft.exchange.common.config graph ssl encrypted transmission + ssl:{ + # if enable is false, other params of ssl are invalid. + enable:{ + graph:false + meta:false + } + # ssl sign type: CA or SELF + sign.type:ca + + # if sign.type is CA, make sure com.vesoft.exchange.common.config the ca.param. If you submit exchange application with cluster, please make sure each worker has the ca files. + ca.param: { + caCrtFilePath:"/path/caCrtFilePath" + crtFilePath:"/path/crtFilePath" + keyFilePath:"/path/keyFilePath" + } + + # if sign.type is SELF, make sure com.vesoft.exchange.common.config the self.param. If you submit exchange application with cluster, please make sure each worker has the ca files. + self.param: { + crtFilePath:"/path/crtFilePath" + keyFilePath:"/path/keyFilePath" + password:"nebula" + } + } + + + # parameters for SST import, not required + path:{ + local:"/tmp" + remote:"/sst" + hdfs.namenode: "hdfs://name_node:9000" + } + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed import job will be recorded in output path + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + # There are tag com.vesoft.exchange.common.config examples for different dataSources. + tags: [ + + # HDFS parquet + # Import mode is client, just change type.sink to sst if you want to use sst import mode. + { + name: tag-name-0 + type: { + source: parquet + sink: client + } + path: hdfs tag path 0 + + fields: [parquet-field-0, parquet-field-1, parquet-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field:new-parquet-field + udf: { + separator: "_" + oldColNames: [parquet-field-0, parquet-field-1] + newColName: new-parquet-field + } + #policy:hash + } + batch: 2000 + partition: 60 + } + + # HDFS csv + # Import mode is sst, just change type.sink to client if you want to use client import mode. + { + name: tag-name-1 + type: { + source: csv + sink: sst + } + path: hdfs tag path 2 + # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields + fields: [csv-field-0, csv-field-1, csv-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field:csv-field-0 + } + separator: "," + header: true + batch: 2000 + partition: 60 + # optional config, default is false + # config repartitionWithNebula as true means: repartition spark dataframe with nebula partition number to write sst files. + repartitionWithNebula: false + } + + # HDFS json + { + name: tag-name-2 + type: { + source: json + sink: client + } + path: hdfs vertex path 3 + fields: [json-field-0, json-field-1, json-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field: json-field-0 + #policy: hash + } + batch: 2000 + partition: 60 + } + + # Hive + { + name: tag-name-3 + type: { + source: hive + sink: client + } + exec: "select hive-field0, hive-field1, hive-field2 from database.table" + fields: [hive-field-0, hive-field-1, hive-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field: hive-field-0 + # policy: "hash" + } + batch: 2000 + partition: 60 + } + + # neo4j + { + name: tag-name-4 + type: { + source: neo4j + sink: client + } + server: "bolt://127.0.0.1:7687" + user: neo4j + password: neo4j + exec: "match (n:label) return n.neo4j-field-0 as neo4j-field-0, n.neo4j-field-1 as neo4j-field-1 order by (n.neo4j-field-0)" + fields: [neo4j-field-0, neo4j-field-1] + nebula.fields: [nebula-field-0, nebula-field-1] + vertex: { + field:neo4j-field-0 + # policy:hash + } + partition: 60 + batch: 2000 + check_point_path: /tmp/test + } + + # HBase + # if fields or vertex contains rowkey, please configure it as "rowkey". + { + name: tag-name-5 + type: { + source: hbase + sink: client + } + host:127.0.0.1 + port:2181 + table:hbase-table + columnFamily:hbase-table-cloumnfamily + fields: [hbase-column-0, hbase-column-1] + nebula.fields: [nebula-field-0, nebula-field-1] + vertex: { + field:rowkey + } + partition: 60 + batch: 2000 + } + + # Pulsar + { + name: tag-name-6 + type: { + source: pulsar + sink: client + } + service: "pulsar://localhost:6650" + admin: "http://localhost:8081" + options: { + # choose one of "topic", "topics", "topicsPattern" + topics: "topic1,topic2" + } + fields: [pulsar-field-0, pulsar-field-1, pulsar-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field:pulsar-field-0 + } + partition: 60 + batch: 2000 + interval.seconds: 10 + } + + # KAFKA + { + name: tag-name-7 + type: { + source: kafka + sink: client + } + service: "kafka.service.address" + topic: "topic-name" + fields: [kafka-field-0, kafka-field-1, kafka-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field: kafka-field-0 + } + partition: 60 + batch: 2000 + interval.seconds: 10 + } + + # MaxCompute + { + name: tag-name-8 + type:{ + source:maxcompute + sink:client + } + table:table + project:project + odpsUrl:"http://service.cn-hangzhou.maxcompute.aliyun.com/api" + tunnelUrl:"http://dt.cn-hangzhou.maxcompute.aliyun.com" + accessKeyId:xxx + accessKeySecret:xxx + partitionSpec:"dt='partition1'" + # default numPartitions is 1 + numPartitions:100 + # maxcompute sql sentence only uses table name. make sure that table name is the same with {table}'s value'. + sentence:"select id, maxcompute-field-0, maxcompute-field-1, maxcompute-field-2 from table where id < 10" + fields:[maxcompute-field-0, maxcompute-field-1] + nebula.fields:[nebula-field-0, nebula-field-1] + vertex:{ + field: maxcompute-field-2 + } + partition:60 + batch:2000 + } + + # ClickHouse + { + name: tag-name-8 + type: { + source: clickhouse + sink: client + } + url:"jdbc:clickhouse://127.0.0.1:8123/database" + user:"user" + password:"clickhouse" + numPartition:"5" + table:"table" + sentence:"select * from table" + fields: [clickhouse-field-0, clickhouse-field-1, clickhouse-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field:clickhouse-field-0 + #policy:hash + } + batch: 2000 + partition: 60 + } + + # PostgreSQL + { + name: tag9 + type: { + source: postgresql + sink: client + } + user:root + host: "127.0.0.1" + port: "5432" + database: "database" + table: "table" + user: "root" + password: "nebula" + sentence: "select postgre-field0, postgre-field1, postgre-field2 from table" + fields: [postgre-field-0, postgre-field-1, postgre-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field: postgre-field-0 + # policy: "hash" + } + batch: 2000 + partition: 60 + } + + # Oracle + { + name: tag10 + type: { + source: oracle + sink: client + } + url:"jdbc:oracle:thin:@host:1521:db" + driver: "oracle.jdbc.driver.OracleDriver" + user: "root" + password: "nebula" + table: "db.table" + sentence: "select oracle-field0, oracle-field1, oracle-field2 from table" + fields: [oracle-field-0, oracle-field-1, oracle-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field: oracle-field-0 + # policy: "hash" + } + batch: 2000 + partition: 60 + } + ] + + # Processing edges + # There are edge com.vesoft.exchange.common.config examples for different dataSources. + edges: [ + # HDFS parquet + # Import mode is client, just change type.sink to sst if you want to use sst import mode. + { + name: edge-name-0 + type: { + source: parquet + sink: client + } + path: hdfs edge path 0 + fields: [parquet-field-0, parquet-field-1, parquet-field-2] + nebula.fields: [nebula-field-0 nebula-field-1 nebula-field-2] + source: { + field:parquet-field-0 + udf:{ + separator:"_" + oldColNames:[parquet-field-0] + newColName:[new-parquet-field] + } + #policy:hash + } + target: { + field:parquet-field-1 + udf:{ + separator:"_" + oldColNames:[parquet-field-0] + newColName:[new-parquet-field] + } + #policy:hash + } + ranking: parquet-field-2 + batch: 2000 + partition: 60 + } + + # HDFS csv + { + name: edge-name-1 + type: { + source: csv + sink: client + } + path: hdfs edge path 1 + fields: [csv-field-0, csv-field-1, csv-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: { + field: csv-field-0 + #policy: hash + } + target: { + field: csv-field-1 + } + ranking: csv-field-2 + separator: "," + header: true + batch: 2000 + partition: 60 + # optional config, default is false + # config repartitionWithNebula as true means: repartition spark dataframe with nebula partition number to write sst files. + repartitionWithNebula: false + } + + # HDFS json + { + name: edge-name-2 + type: { + source: json + sink: client + } + path: hdfs edge path 2 + fields: [json-field-0, json-field-1, json-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: { + field: json-field-0 + #policy: hash + } + target: { + field: json-field-1 + } + ranking: json-field-2 + batch: 2000 + partition: 60 + } + + # Hive + { + name: edge-name-2 + type: { + source: hive + sink: client + } + exec: "select hive-field0, hive-field1, hive-field2 from database.table" + fields: [ hive-field-0, hive-field-1, hive-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: hive-field-0 + target: hive-field-1 + ranking: hive-filed-2 + batch: 2000 + partition: 60 + } + + # Neo4j + { + name: edge-name-3 + type: { + source: neo4j + sink: client + } + server: "bolt://127.0.0.1:7687" + user: neo4j + password: neo4j + exec: "match (a:vertex_label)-[r:edge_label]->(b:vertex_label) return a.neo4j-source-field, b.neo4j-target-field, r.neo4j-field-0 as neo4j-field-0, r.neo4j-field-1 as neo4j-field-1 order by id(r)" + fields: [neo4j-field-0, neo4j-field-1] + nebula.fields: [nebula-field-0, nebula-field-1] + source: { + field: a.neo4j-source-field + } + target: { + field: b.neo4j-target-field + } + ranking: neo4j-field-2 + partition: 60 + batch: 2000 + check_point_path: /tmp/test + } + + # HBase + { + name: edge-name-4 + type: { + source: hbase + sink: client + } + host:127.0.0.1 + port:2181 + table:hbase-table + columnFamily:hbase-table-cloumnfamily + fields: [hbase-column-0, hbase-column-1] + nebula.fields:[nebula-field-0, nebula-field-1] + source: { + field: hbase-column-k + } + target: { + field: hbase-column-h + } + ranking: hbase-column-t + partition: 60 + batch: 2000 + } + + + # Pulsar + { + name: edge-name-5 + type: { + source: pulsar + sink: client + } + service: "pulsar://localhost:6650" + admin: "http://localhost:8081" + options: { + # choose one of "topic", "topics", "topicsPattern" + topic: "topic1" + } + fields: [pulsar-field-0, pulsar-field-1, pulsar-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: { + field: pulsar-field-0 + #policy: hash + } + target: { + field: pulsar-field-1 + } + ranking: pulsar-field-2 + partition: 60 + batch: 2000 + interval.seconds: 10 + } + + # KAFKA + { + name: edge-name-6 + type: { + source: kafka + sink: client + } + service: "kafka.service.address" + topic: "topic-name" + fields: [kafka-field-0, kafka-field-1, kafka-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: kafka-field-0 + target: kafka-field-1 + ranking: kafka-field-2 + partition: 60 + batch: 2000 + interval.seconds: 10 + } + + # MaxCompute + { + name: edge-name-7 + type:{ + source:maxcompute + sink:client + } + table:table + project:project + odpsUrl:"http://service.cn-hangzhou.maxcompute.aliyun.com/api" + tunnelUrl:"http://dt.cn-hangzhou.maxcompute.aliyun.com" + accessKeyId:xxx + accessKeySecret:xxx + partitionSpec:"dt='partition1'" + # maxcompute sql sentence only uses table name. + sentence:"select * from table" + fields:[maxcompute-field-0, maxcompute-field-1] + nebula.fields:[nebula-field-0, nebula-field-1] + source:{ + field: maxcompute-field-2 + } + target:{ + field: maxcompute-field-3 + } + ranking: maxcompute-field-4 + partition:60 + batch:2000 + } + + # ClickHouse + { + name: edge-name-7 + type: { + source: clickhouse + sink: client + } + url:"jdbc:clickhouse://127.0.0.1:8123/database" + user:"user" + password:"clickhouse" + numPartition:"5" + table:"table" + sentence:"select * from table" + fields: [clickhouse-field-2] + nebula.fields: [nebula-field-2] + source: { + field:clickhouse-field-0 + #policy:hash + } + target: { + field:clickhouse-field-1 + #policy:hash + } + ranking:clickhouse-field-3 + batch: 2000 + partition: 60 + } + + # PostgreSQL + { + name: edge-name-8 + type: { + source: postgresql + sink: client + } + user:root + host: "127.0.0.1" + port: "5432" + database: "database" + table: "table" + user: "root" + password: "nebula" + sentence: "select postgre-field0, postgre-field1, postgre-field2 from table" + fields: [postgre-field-0, postgre-field-1, postgre-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: { + field: mysql-field-0 + # policy: "hash" + } + target: { + field: mysql-field-0 + # policy: "hash" + } + ranking: postgre-field-1 + batch: 2000 + partition: 60 + } + + # Oracle + { + name: edge-name-9 + type: { + source: oracle + sink: client + } + url:"jdbc:oracle:thin:@host:1521:db" + driver: "oracle.jdbc.driver.OracleDriver" + user: "root" + password: "nebula" + table: "db.table" + sentence: "select oracle-field0, oracle-field1, oracle-field2 from table" + fields: [oracle-field-0, oracle-field-1, oracle-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: { + field: oracle-field-0 + # policy: "hash" + } + target: { + field: oracle-field-1 + } + ranking: oracle-field-2 + batch: 2000 + partition: 60 + } + ] +}