Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

import data inside kafka value to nebula #157

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ target/
.idea/
.eclipse/
*.iml
.metals/

spark-importer.ipr
spark-importer.iws
Expand Down
2 changes: 1 addition & 1 deletion nebula-exchange/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ target/
spark-importer.ipr
spark-importer.iws

.DS_Store
.DS_Store
118 changes: 118 additions & 0 deletions nebula-exchange/README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,124 @@ nebula-exchange-2.0.0.jar \

关于 Nebula Exchange 的更多说明,请参考 Exchange 2.0 的[使用手册](https://docs.nebula-graph.com.cn/2.0.1/nebula-exchange/about-exchange/ex-ug-what-is-exchange/) 。

## Nebula Exchange 2.5 添加新功能:导入kafka value字段数据到Nebula Graph

主要修改如下:

*1. kafka使用简介*

*2. kafka配置说明*

*3. kafka导入限制*

*4. kafka数据重载*

### kafka使用简介

本功能主要解决从kafka获取数据并导入到Nebula的问题。具体来说,当数据源为kafka时,会默认从kafka的value字段中解析出配置文件里定义的多个tag,多个edge或者多个tag和edge,并将其以client的方式导入到Nebula中。

### kafka配置说明
由于kafka是流式数据,所以当数据源是kafka时,程序只能一直接收来自kafka的数据,不能进行数据源的切换。因此在本次更新中,我们将kafka的配置项单独列出来,当定义了kafka配置项时,表明配置文件里的tag和edge都从kafka的value字段中解析出来。配置项示例如下所示:
```
kafka: {
# 对于kafka,必须在这里设置相关参数,其他源(如hive等)在tag内部指定即可
# 只能从kafka的value(必须是json字符串)中解析相关数据
# 只能以client模式将kafka数据导入nebula中

# Kafka服务器地址。
# 可以多个。
service: "kafka-server1,kafka-server2"
# 消息类别。
# 只能一个。
topic: "kafka-topic"
# 单次写入 Nebula Graph 的最大点数据量。
batch: 50
# Spark 分区数量
partition: 10
# 读取消息的间隔。单位:秒。
interval.seconds: 30

# 如果不需要重新加载未成功执行的ngql语句(保存在error.output路径下)那么这里可以注释掉忽略。
# 如果需要加载,请确保这些语句没有语义和语法错误。
# reload有三种模式:
# continue:重载一次保存在error.output下的数据,不管执行成功与否。重载完后会继续消费kafka消息。
# onlyonce:重载一次保存在error.output下的数据,不管执行成功与否。重载完后退出程序不消费kafka消息。
# needretry:一直重载保存在error.output下的数据,直到全部重载成功或者达到retry次数限制。若不定义retry,则默认为3。执行完后退出程序不消费kafka消息。
# reload: continue
# retry: 3

# 是否将kafka中的value字段和从value中解析出来的tag/edge数据打印到log里,若注释掉,则默认是false。
verbose: true
}

tags: [
{
# Nebula Graph中对应的标签名称。
name: tagName

# 在fields里指定player表中的列名称,其对应的value会作为Nebula Graph中指定属性。
# fields和nebula.fields里的配置必须一一对应。
# 如果需要指定多个列名称,用英文逗号(,)隔开。

# fields中的列名称必须存在于kafka value的json里,否则解析失败。
# 记录在kafka value字段里的json数据必须都是string/long/int类型。
fields: [col1, col2]
nebula.fields: [col1, col2]

# 指定表中某一列数据为Nebula Graph中点VID的来源。必须唯一,否则vid相同的数据会被覆盖。
vertex:{
field:vid
}
}
]

edges: [
{
# Nebula Graph中对应的边类型名称。
name: edgeName

# 在fields里指定follow表中的列名称,其对应的value会作为Nebula Graph中指定属性。
# fields和nebula.fields里的配置必须一一对应。
# 如果需要指定多个列名称,用英文逗号(,)隔开。

# fields中的列名称必须存在于kafka value的json里,否则解析失败。
# 记录在kafka value字段里的json数据必须都是string/long/int类型。
fields: [col1]
nebula.fields: [col1]

# 在source里,将follow表中某一列作为边的起始点数据源。必须唯一,否则vid相同的数据会被覆盖。
# 在target里,将follow表中某一列作为边的目的点数据源。必须唯一,否则vid相同的数据会被覆盖。
# 注意,相同schema且soure/target相同的边必须指定ranking。
source:{
field:src
}
target:{
field:dst
}
}
]
```

值得注意的是,kafka可以从多个server处消费数据,但是只能消费一个topic的数据。此外,当数据源不是kafka时,请将kafka项注释掉,否则一旦定义了kafka配置项,就会默认从kafka处消费数据,此时tag/edge内部的type.source失效。

### kafka导入限制
- kafka模式默认从kafka的value字段解析数据,此时kafka的其他字段,如key,offset,partition等都会抛弃。
- value字段必须是json字符串,且每个field的值都必须是string或者int/long类型。程序会根据在Nebula schema中定义的数据类型来获取数据。
- kafka模式只支持client模式导入数据。
- 对于边数据,必须指定source和target字段,明确指明这条边对应的两个顶点,不允许isGeo为true的情况。
- 如果数据为null,在生成kafka消息时,直接往json里put空值即可,如jsonObj.put(stringField, null)。

### kafka数据重载
kafka导入失败的ngql语句会落盘到errorpath指定的文件里。我们一共定义了三种重载数据的方式:
- continue:重载一次保存在error.output下的数据,不管执行成功与否。重载完后会继续消费kafka消息。
- onlyonce:重载一次保存在error.output下的数据,不管执行成功与否。重载完后退出程序不消费kafka消息。
- needretry:一直重载保存在error.output下的数据,直到全部重载成功或者达到retry次数限制。若不定义retry,则默认为3。执行完后退出程序不消费kafka消息。

continue模式适合在开启新的程序时,先重载一次数据,然后再消费。onlyonce和needretry模式适合在不影响已经运行的程序的情况下,另开一个程序处理错误语句。
我们修改了程序里reload数据的逻辑,每次都会只保留本次reload失败的数据,并删除上次reload失败的数据。使用时请再确保重载的数据没有语义和语法错误。
此外,由于kafka写入错误语句的文件需要复用,因此我们将文件的写入方式改为append。


## 贡献

Nebula Exchange 2.0 是一个完全开源的项目,欢迎开源爱好者通过以下方式参与:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ object ErrorHandler {
val fileSystem = FileSystem.get(new Configuration())
val filesStatus = fileSystem.listStatus(new Path(path))
for (file <- filesStatus) {
if (!file.getPath.getName.startsWith("reload.")) {
if (!file.getPath.getName.startsWith("reload.") ||
file.getPath.getName.endsWith(".lastReload")) {
fileSystem.delete(file.getPath, true)
}
}
Expand All @@ -47,8 +48,14 @@ object ErrorHandler {
*/
def save(buffer: ArrayBuffer[String], path: String): Unit = {
LOG.info(s"create reload path $path")
val fileSystem = FileSystem.get(new Configuration())
val errors = fileSystem.create(new Path(path))
val fileSystem = FileSystem.get(new Configuration())
val targetPath = new Path(path)
val errors = if (fileSystem.exists(targetPath)) {
// For kafka, the error ngql need to append to a same file instead of overwrite
fileSystem.append(targetPath)
} else {
fileSystem.create(targetPath)
}

try {
for (error <- buffer) {
Expand All @@ -60,6 +67,30 @@ object ErrorHandler {
}
}

/**
* rename the stale reload file
*
* @param path
*/
def rename(path: String): Unit = {
try {
val fileSystem = FileSystem.get(new Configuration())
val filesStatus = fileSystem.listStatus(new Path(path))
for (file <- filesStatus) {
val reloadFile = file.getPath.getName
if (reloadFile.startsWith("reload.")) {
if(!fileSystem.rename(file.getPath, new Path(reloadFile + ".lastReload"))) {
LOG.warn(s"Can not rename $reloadFile. The new reload data will append after stale reload data")
}
}
}
} catch {
case e: Throwable => {
LOG.error(s"Stale reload file in $path cannot be rename ", e)
}
}
}

/**
* check if path exists
*
Expand Down
124 changes: 111 additions & 13 deletions nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import com.vesoft.nebula.exchange.config.{
Neo4JSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
SourceCategory,
KafkaReloadCategory
}
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor, KafkaProcessor}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
Expand All @@ -43,6 +44,7 @@ import com.vesoft.nebula.exchange.reader.{
import com.vesoft.nebula.exchange.processor.ReloadProcessor
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import scala.util.control.Breaks

final case class Argument(config: String = "application.conf",
hive: Boolean = false,
Expand Down Expand Up @@ -120,6 +122,70 @@ object Exchange {
sys.exit(0)
}

if (configs.kafkaConfigEntry.isDefined) {
if (configs.tagsConfig.isEmpty && configs.edgesConfig.isEmpty) {
throw new IllegalArgumentException("Both tag and edge is empty, nothing to parse")
}
val kafkaConfig = configs.kafkaConfigEntry.get
// handle error
if (kafkaConfig.reloadEntry.isDefined) {
val pattern = kafkaConfig.reloadEntry.get.reload
LOG.info(s"Need to reload kafka data in ${configs.errorConfig.errorPath}, the pattern is $pattern.")
pattern match {
case KafkaReloadCategory.ONLYONCE =>
if (!handleError(spark, configs, configs.errorConfig.errorPath)) {
LOG.info("Can not reload data successfully")
}
LOG.info("Reload kafka data down")
LOG.info("Kafka reload category is onlyonce, so we shutdown after reload data.")
spark.close()
sys.exit(0)
case KafkaReloadCategory.NEEDRETRY =>
val retry = kafkaConfig.reloadEntry.get.retry
LOG.info(s"Retry times: $retry")
val loop = new Breaks
var cnt = 0
loop.breakable(
while (cnt < retry) {
if (handleError(spark, configs, configs.errorConfig.errorPath)) {
loop.break()
} else {
cnt += 1
}
}
)
if (cnt == retry) {
LOG.error("Can not reload data successfully")
}
LOG.info("Reload kafka data down")
LOG.info("Kafka reload category is needtry, so we shutdown after reload data.")
spark.close()
sys.exit(0)
case KafkaReloadCategory.CONTINUE =>
if (!handleError(spark, configs, configs.errorConfig.errorPath)) {
LOG.info("Can not reload data successfully")
}
LOG.info("Reload data down. Begin consume kakfa data and send it to nebula")
case _ =>
throw new IllegalArgumentException("Unknow pattern for kafka reload.")
}
}
LOG.info("Parse tag and edge from kafka value and insert them to nebula")
// import data inside kafka value to Nebula
val data = createDataSource(spark, kafkaConfig)
if (data.isDefined && !c.dry) {
val processor = new KafkaProcessor(
spark,
repartition(data.get, kafkaConfig.partition, SourceCategory.KAFKA),
configs)
processor.process()
} else {
LOG.info("Unknow error, can not read data from kafka")
}
spark.close()
sys.exit(0)
}

// record the failed batch number
var failures: Long = 0L

Expand Down Expand Up @@ -206,17 +272,8 @@ object Exchange {
}

// reimport for failed tags and edges
if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(configs.errorConfig.errorPath)
val startTime = System.currentTimeMillis()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
processor.process()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(s"reimport ngql cost time: ${costTime}")
LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")
if (failures > 0) {
handleError(spark, configs, configs.errorConfig.errorPath)
}
spark.close()
}
Expand Down Expand Up @@ -304,6 +361,47 @@ object Exchange {
}
}

/**
* handle error gql in error path
*
* @param spark
* @param configs
* @param errorpath
* @param needStored
*
* @return
*/
private[this] def handleError(spark: SparkSession, configs: Configs, errorpath: String): Boolean = {
if (ErrorHandler.existError(errorpath)) {
LOG.info(s"Begin handle error stored in $errorpath")
// get date need to reload
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(errorpath)
data.cache()

// rename the reload file
ErrorHandler.rename(errorpath)

val count = data.count()
val startTime = System.currentTimeMillis()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
processor.process()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(s"reimport ngql count: ${count}, cost time: ${costTime}")
LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")

// clear stale reload file generated during last reload period.
// Only store reload file created during this reload process period.
LOG.info(s"clear error files in $errorpath except file which start with reload")
ErrorHandler.clear(errorpath)
batchFailure.value == 0
} else {
LOG.info(s"There is no error stored in $errorpath. Skip error handle")
true
}
}
/**
* Repartition the data frame using the specified partition number.
*
Expand Down
Loading