Skip to content

Commit

Permalink
support to get template config file according to the datasource (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Oct 19, 2023
1 parent a4e888c commit 079d161
Show file tree
Hide file tree
Showing 15 changed files with 1,117 additions and 37 deletions.
10 changes: 8 additions & 2 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ Exchange 支持的 Spark 版本包括 2.2、2.4 和
,或参考 Exchange 1.0
的使用文档[NebulaExchange 用户手册](https://docs.nebula-graph.com.cn/nebula-exchange/about-exchange/ex-ug-what-is-exchange/ "点击前往 Nebula Graph 网站")。

> 注意:3.4.0版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或
> 3.3.0 或 3.5.0 版本。

## 如何获取

Expand Down Expand Up @@ -53,6 +51,14 @@ Exchange 支持的 Spark 版本包括 2.2、2.4 和
进入[GitHub Actions Artifacts](https://github.com/vesoft-inc/nebula-exchange/actions/workflows/snapshot.yml)
页面点击任意 workflow 后,从 Artifacts 中,根据需求下载下载。

## 自动生成示例配置文件

通过如下命令,指定要导入的数据源,即可获得该数据源所对应的配置文件示例。
```agsl
java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s {source} -p
{target-path-to-save-config-file}
```

## 版本匹配

Exchange 和 NebulaGraph 的版本对应关系如下:
Expand Down
103 changes: 68 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
# NebulaGraph Exchange
[中文版](https://github.com/vesoft-inc/nebula-exchange/blob/master/README-CN.md)

NebulaGraph Exchange (referred to as Exchange) is an Apache Spark™ application used to migrate data in bulk from different sources to NebulaGraph in a distributed way(Spark). It supports a variety of batch or streaming data sources and allows direct writing to NebulaGraph through side-loading (SST Files).
[中文版](https://github.com/vesoft-inc/nebula-exchange/blob/master/README-CN.md)

Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective toolkits named: `nebula-exchange_spark_2.2`, `nebula-exchange_spark_2.4`, and `nebula-exchange_spark_3.0`.
NebulaGraph Exchange (referred to as Exchange) is an Apache Spark™ application used to migrate data
in bulk from different sources to NebulaGraph in a distributed way(Spark). It supports a variety of
batch or streaming data sources and allows direct writing to NebulaGraph through side-loading (SST
Files).

Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective toolkits
named: `nebula-exchange_spark_2.2`, `nebula-exchange_spark_2.4`, and `nebula-exchange_spark_3.0`.

> Note:
> - Exchange 3.4.0 does not support Apache Kafka and Apache Pulsar. Please use Exchange of version 3.0.0, 3.3.0, or 3.5.0 to load data from Apache Kafka or Apache Pulsar to NebulaGraph for now.
> - This repo covers only NebulaGraph 2.x and 3.x, for NebulaGraph v1.x, please use [NebulaGraph Exchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange).
> - Exchange 3.4.0 does not support Apache Kafka and Apache Pulsar. Please use Exchange of version
3.0.0, 3.3.0, or 3.5.0 to load data from Apache Kafka or Apache Pulsar to NebulaGraph for now.
> - This repo covers only NebulaGraph 2.x and 3.x, for NebulaGraph v1.x, please
use [NebulaGraph Exchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange).

## Build or Download Exchange

Expand All @@ -21,13 +28,16 @@ Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective t
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-exchange_spark_3.0 -am -Pscala-2.12 -Pspark-3.0
```

After packaging, the newly generated JAR files can be found in the following path:
- nebula-exchange/nebula-exchange_spark_2.2/target/ contains nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar
- nebula-exchange/nebula-exchange_spark_2.4/target/ contains nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar
- nebula-exchange/nebula-exchange_spark_3.0/target/ contains nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar
After packaging, the newly generated JAR files can be found in the following path:
- nebula-exchange/nebula-exchange_spark_2.2/target/ contains
nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar
- nebula-exchange/nebula-exchange_spark_2.4/target/ contains
nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar
- nebula-exchange/nebula-exchange_spark_3.0/target/ contains
nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar

3. Download from the GitHub artifact

**Released Version:**

[GitHub Releases](https://github.com/vesoft-inc/nebula-exchange/releases)
Expand Down Expand Up @@ -63,7 +73,8 @@ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \
-c application.conf
```

Note: When using Exchange to generate SST files, please add `spark.sql.shuffle.partition` in `--conf` for Spark's shuffle operation:
Note: When using Exchange to generate SST files, please add `spark.sql.shuffle.partition`
in `--conf` for Spark's shuffle operation:
```
$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange \
Expand All @@ -73,38 +84,60 @@ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \
-c application.conf
```
For more details, please refer to [NebulaGraph Exchange Docs](https://docs.nebula-graph.io/master/nebula-exchange/about-exchange/ex-ug-what-is-exchange/)
For more details, please refer
to [NebulaGraph Exchange Docs](https://docs.nebula-graph.io/master/nebula-exchange/about-exchange/ex-ug-what-is-exchange/)
## How to get the config file
You can get the template config file with your datasource through the command:
```agsl
java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s {source} -p
{target-path-to-save-config-file}
```
Such as your datasource is csv, and want to save the template config file in /tmp/, please run:
```agsl
java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s csv -p /tmp
```
## Version Compatibility Matrix
Here is the version correspondence between Exchange and NebulaGraph:
| Exchange Version | Nebula Version | Spark Version |
|:------------------------------------------:|:--------------:|:--------------:|
| nebula-exchange-2.0.0.jar | 2.0.0, 2.0.1 |2.4.*|
| nebula-exchange-2.0.1.jar | 2.0.0, 2.0.1 |2.4.*|
| nebula-exchange-2.1.0.jar | 2.0.0, 2.0.1 |2.4.*|
| nebula-exchange-2.5.0.jar | 2.5.0, 2.5.1 |2.4.*|
| nebula-exchange-2.5.1.jar | 2.5.0, 2.5.1 |2.4.*|
| nebula-exchange-2.5.2.jar | 2.5.0, 2.5.1 |2.4.*|
| nebula-exchange-2.6.0.jar | 2.6.0, 2.6.1 |2.4.*|
| nebula-exchange-2.6.1.jar | 2.6.0, 2.6.1 |2.4.*|
| nebula-exchange-2.6.2.jar | 2.6.0, 2.6.1 |2.4.*|
| nebula-exchange-2.6.3.jar | 2.6.0, 2.6.1 |2.4.*|
| nebula-exchange_spark_2.2-3.x.x.jar | 3.x.x |2.2.*|
| nebula-exchange_spark_2.4-3.x.x.jar | 3.x.x |2.4.*|
| nebula-exchange_spark_3.0-3.x.x.jar | 3.x.x |`3.0.*`,`3.1.*`,`3.2.*`,`3.3.*`|
| nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar | nightly |2.2.*|
| nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar | nightly |2.4.*|
| nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar | nightly |`3.0.*`,`3.1.*`,`3.2.*`,`3.3.*`|
| Exchange Version | Nebula Version | Spark Version |
|:------------------------------------------:|:--------------:|:-------------------------------:|
| nebula-exchange-2.0.0.jar | 2.0.0, 2.0.1 | 2.4.* |
| nebula-exchange-2.0.1.jar | 2.0.0, 2.0.1 | 2.4.* |
| nebula-exchange-2.1.0.jar | 2.0.0, 2.0.1 | 2.4.* |
| nebula-exchange-2.5.0.jar | 2.5.0, 2.5.1 | 2.4.* |
| nebula-exchange-2.5.1.jar | 2.5.0, 2.5.1 | 2.4.* |
| nebula-exchange-2.5.2.jar | 2.5.0, 2.5.1 | 2.4.* |
| nebula-exchange-2.6.0.jar | 2.6.0, 2.6.1 | 2.4.* |
| nebula-exchange-2.6.1.jar | 2.6.0, 2.6.1 | 2.4.* |
| nebula-exchange-2.6.2.jar | 2.6.0, 2.6.1 | 2.4.* |
| nebula-exchange-2.6.3.jar | 2.6.0, 2.6.1 | 2.4.* |
| nebula-exchange_spark_2.2-3.x.x.jar | 3.x.x | 2.2.* |
| nebula-exchange_spark_2.4-3.x.x.jar | 3.x.x | 2.4.* |
| nebula-exchange_spark_3.0-3.x.x.jar | 3.x.x | `3.0.*`,`3.1.*`,`3.2.*`,`3.3.*` |
| nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar | nightly | 2.2.* |
| nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar | nightly | 2.4.* |
| nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar | nightly | `3.0.*`,`3.1.*`,`3.2.*`,`3.3.*` |
## Feature History
1. *Since 2.0* Exchange allows for the import of vertex data with both String and Integer type IDs.
2. *Since 2.0* Exchange also supports importing data of various types, including Null, Date, DateTime (using UTC instead of local time), and Time.
3. *Since 2.0* In addition to Hive on Spark, Exchange can import data from other Hive sources as well.
4. *Since 2.0* If there are failures during the data import process, Exchange supports recording and retrying the INSERT statement.
5. *Since 2.5* While SST import is supported by Exchange, property default values are not yet supported.
2. *Since 2.0* Exchange also supports importing data of various types, including Null, Date,
DateTime (using UTC instead of local time), and Time.
3. *Since 2.0* In addition to Hive on Spark, Exchange can import data from other Hive sources as
well.
4. *Since 2.0* If there are failures during the data import process, Exchange supports recording and
retrying the INSERT statement.
5. *Since 2.5* While SST import is supported by Exchange, property default values are not yet
supported.
6. *Since 3.0* Exchange is compatible with Spark 2.2, Spark 2.4, and Spark 3.0.
Refer to [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf) as an example to edit the configuration file.
Refer
to [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf)
as an example to edit the configuration file.
10 changes: 10 additions & 0 deletions exchange-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.16</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Copyright (c) 2023 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.exchange.common;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class FileMigrate {
//Logger log = Logger.getLogger(FileMigrate.class);


/**
* migrate the source file to target path
*
* @param sourceFile template config file
* @param path target path to save the config info
*/
public void saveConfig(String sourceFile, String path) {
InputStream inputStream =
this.getClass().getClassLoader().getResourceAsStream(sourceFile);
if (inputStream == null) {
System.exit(-1);
}
File file = new File(path);
if (file.exists()) {
file.delete();
}
FileWriter writer = null;
BufferedWriter bufferedWriter = null;
BufferedReader reader = null;
try {
writer = new FileWriter(path);
bufferedWriter = new BufferedWriter(writer);

reader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = reader.readLine()) != null) {
bufferedWriter.write(line);
bufferedWriter.write("\n");
}
} catch (IOException e) {
System.out.println("Failed to migrate the template conf file:" + e.getMessage());
e.printStackTrace();
} finally {
try {
if (bufferedWriter != null) {
bufferedWriter.close();
}
if (reader != null) {
reader.close();
}
} catch (IOException e) {
System.out.println("Failed to close the writer or reader:" + e.getMessage());
e.printStackTrace();
}
}

}
}
100 changes: 100 additions & 0 deletions exchange-common/src/main/resources/config_template/csv.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Use the command to submit the exchange job:

# spark-submit \
# --master "spark://master_ip:7077" \
# --driver-memory=2G --executor-memory=30G \
# --num-executors=3 --total-executor-cores=60 \
# --class com.vesoft.nebula.exchange.Exchange \
# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c csv.conf

{
# Spark config
spark: {
app: {
name: NebulaGraph Exchange
}
}

# Nebula Graph config
nebula: {
address:{
graph: ["127.0.0.1:9669","127.0.0.2:9669"]
# if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta.
# use `SHOW meta leader` to see your meta leader's address
meta: ["127.0.0.1:9559"]
}
user: root
pswd: nebula
space: test

# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}

error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed data will be recorded in output path, format with ngql
output: "hdfs://127.0.0.1:9000/tmp/errors"
}

# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}

# Processing tags
tags: [
{
name: tag-name
type: {
source: csv
sink: client
}
# if your file in not in hdfs, config "file:///path/test.csv"
path: "hdfs://ip:port/path/test.csv"
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [csv-field-1, csv-field-2, csv-field-3]
nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3]
vertex: {
field: csv-field-0
}
separator: ","
header: true
batch: 2000
partition: 60
}
]

# process edges
edges: [
{
name: edge-name
type: {
source: csv
sink: client
}
path: "hdfs://ip:port/path/test.csv"
fields: [csv-field-2, csv-field-3, csv-field-4]
nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3]
source: {
field: csv-field-0
}
target: {
field: csv-field-1
}
#ranking: csv-field-2
separator: ","
header: true
batch: 2000
partition: 60
}
]
}
Loading

0 comments on commit 079d161

Please sign in to comment.