Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exchange supports filter (#467) #2548

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ For different data sources, the vertex configurations are different. There are m
|`tags.vertex.policy`|string|-|No|Supports only the value `hash`. Performs hashing operations on VIDs of type string.|
|`tags.batch`|int|`256`|Yes|The maximum number of vertices written into NebulaGraph in a single batch.|
|`tags.partition`|int|`32`|Yes|The number of partitions to be created when the data is written to {{nebula.name}}. If `tags.partition ≤ 1`, the number of partitions to be created in {{nebula.name}} is the same as that in the data source.|
|`tags.filter`|string|-|No|The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}. For information about filtering formats, see [Dataset](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#filter(conditionExpr:String):org.apache.spark.sql.Dataset[T]).|


#### Specific parameters of Parquet/JSON/ORC data sources

Expand Down Expand Up @@ -303,6 +305,7 @@ For the specific parameters of different data sources for edge configurations, p
|`edges.ranking`|int|-|No|The column of rank values. If not specified, all rank values are `0` by default.|
|`edges.batch`|int|`256`|Yes|The maximum number of edges written into NebulaGraph in a single batch.|
|`edges.partition`|int|`32`|Yes|The number of partitions to be created when the data is written to {{nebula.name}}. If `edges.partition ≤ 1`, the number of partitions to be created in {{nebula.name}} is the same as that in the data source.|
|`edges.filter`|string|-|No|The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}. For information about filtering formats, see [Dataset](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#filter(conditionExpr:String):org.apache.spark.sql.Dataset[T]).|

#### Specific parameters for generating SST files

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -276,6 +279,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -308,6 +311,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -305,6 +308,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -341,6 +344,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -360,6 +363,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -322,6 +325,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -300,6 +303,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -333,6 +336,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -287,6 +290,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# policy:hash
}

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -280,6 +283,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# (Optional) Specify a column as the source of the rank.
#ranking: rank

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down Expand Up @@ -373,6 +376,9 @@ After Exchange is compiled, copy the conf file `target/classes/application.conf`
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false

# The filtering rule. The data that matches the filter rule is imported into {{nebula.name}}.
# filter: "name='Tom'"

# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ check: the real password decrypted by private key and encrypted password is: neb
|`tags.vertex.udf.newColName`|string|-|否|通过自定义规则合并多列,该参数指定新列的列名。|
|`tags.vertex.prefix`|string|-|否|为 VID 增加指定的前缀。例如 VID 为`12345`,增加前缀`tag1`后为`tag1_12345`。下划线无法修改。|
|`tags.vertex.policy`|string|-|否|仅支持取值`hash`。对 string 类型的 VID 进行哈希化操作。|
|`tags.batch`|int|`256`|是|单批次写入 {{nebula.name}} 的最大点数量。|
|`tags.partition`|int|`32`|是|数据写入 {{nebula.name}} 时需要创建的分区数。如果`tags.partition ≤ 1`,在 {{nebula.name}} 中创建的分区数和数据源的分区数相同。|
|`tags.batch`|int|`256`|是|单批次写入{{nebula.name}}的最大点数量。|
|`tags.partition`|int|`32`|是|数据写入{{nebula.name}}时需要创建的分区数。如果`tags.partition ≤ 1`,在{{nebula.name}}中创建的分区数和数据源的分区数相同。|
|`tags.filter`|string|-|否|过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。格式请参见[Dataset](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#filter(conditionExpr:String):org.apache.spark.sql.Dataset[T])。|


#### Parquet/JSON/ORC 源特有参数

Expand Down Expand Up @@ -304,8 +306,10 @@ check: the real password decrypted by private key and encrypted password is: neb
|`edges.target.prefix`|string|-|否|为 VID 增加指定的前缀。例如 VID 为`12345`,增加前缀`tag1`后为`tag1_12345`。下划线无法修改。|
|`edges.target.policy`|string|-|否|仅支持取值`hash`。对 string 类型的 VID 进行哈希化操作。|
|`edges.ranking`|int|-|否|rank 值的列。没有指定时,默认所有 rank 值为`0`。|
|`edges.batch`|int|`256`|是|单批次写入 {{nebula.name}} 的最大边数量。|
|`edges.partition`|int|`32`|是|数据写入 {{nebula.name}} 时需要创建的分区数。如果`edges.partition ≤ 1`,在 {{nebula.name}} 中创建的分区数和数据源的分区数相同。|
|`edges.batch`|int|`256`|是|单批次写入{{nebula.name}}的最大边数量。|
|`edges.partition`|int|`32`|是|数据写入{{nebula.name}}时需要创建的分区数。如果`edges.partition ≤ 1`,在{{nebula.name}}中创建的分区数和数据源的分区数相同。|
|`edges.filter`|string|-|否|过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。格式请参见[Dataset](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#filter(conditionExpr:String):org.apache.spark.sql.Dataset[T])。|


#### 生成 SST 时的特有参数

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@
# policy:hash
}

# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"

# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT

Expand Down Expand Up @@ -281,6 +284,9 @@
# 指定一个列作为 rank 的源(可选)。
#ranking: rank

# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"

# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@
# 如果 CSV 文件没有表头,请将 header 设置为 false。默认值为 false。
header: false

# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"

# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT

Expand Down Expand Up @@ -305,6 +308,9 @@
# 如果 CSV 文件没有表头,请将 header 设置为 false。默认值为 false。
header: false

# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"

# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ ROW COLUMN+CELL
# policy:hash
}

# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"

# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT

Expand Down Expand Up @@ -302,6 +305,9 @@ ROW COLUMN+CELL
# 指定一个列作为 rank 的源(可选)。
#ranking: rank

# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"

# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ scala> sql("select playerid, teamid, start_year, end_year from basketball.serve"
# policy:hash
}

# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"

# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT

Expand Down Expand Up @@ -336,6 +339,9 @@ scala> sql("select playerid, teamid, start_year, end_year from basketball.serve"
# 指定一个列作为 rank 的源(可选)。
#ranking: rank

# 过滤规则。符合过滤规则的数据会被导入{{nebula.name}}。
# filter: "name='Tom'"

# 批量操作类型,包括 INSERT、UPDATE 和 DELETE。默认为 INSERT。
#writeMode: INSERT

Expand Down
Loading