Skip to content

Commit

Permalink
[Feature][Clickhouse] Support sink savemode (#8086)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangshenghang authored Dec 4, 2024
1 parent d33b0da commit e6f92fd
Show file tree
Hide file tree
Showing 30 changed files with 2,055 additions and 273 deletions.
61 changes: 61 additions & 0 deletions docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,69 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| primary_key | String | No | - | Mark the primary key column from clickhouse table, and based on primary key execute INSERT/UPDATE/DELETE to clickhouse table. |
| support_upsert | Boolean | No | false | Support upsert row by query primary key. |
| allow_experimental_lightweight_delete | Boolean | No | false | Allow experimental lightweight delete based on `*MergeTree` table engine. |
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save mode. Please refer to the `schema_save_mode` section below. |
| data_save_mode | Enum | no | APPEND_DATA | Data save mode. Please refer to the `data_save_mode` section below. |
| save_mode_create_template | string | no | see below | See below. |
| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details. |

### schema_save_mode[Enum]

Before starting the synchronization task, choose different processing options for the existing table schema.
Option descriptions:
`RECREATE_SCHEMA`: Create the table if it does not exist; drop and recreate the table when saving.
`CREATE_SCHEMA_WHEN_NOT_EXIST`: Create the table if it does not exist; skip if the table already exists.
`ERROR_WHEN_SCHEMA_NOT_EXIST`: Throw an error if the table does not exist.
`IGNORE`: Ignore the processing of the table.

### data_save_mode[Enum]

Before starting the synchronization task, choose different processing options for the existing data on the target side.
Option descriptions:
`DROP_DATA`: Retain the database schema but delete the data.
`APPEND_DATA`: Retain the database schema and the data.
`CUSTOM_PROCESSING`: Custom user-defined processing.
`ERROR_WHEN_DATA_EXISTS`: Throw an error if data exists.

### save_mode_create_template

Automatically create Doris tables using templates.
The table creation statements will be generated based on the upstream data types and schema. The default template can be modified as needed.

Default template:
```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE = MergeTree()
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
```

If custom fields are added to the template, for example, adding an `id` field:

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
id,
${rowtype_fields}
) ENGINE = MergeTree()
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
```

The connector will automatically retrieve the corresponding types from the upstream source and fill in the template, removing the `id` field from the `rowtype_fields`. This method can be used to modify custom field types and attributes.

The following placeholders can be used:

- `database`: Retrieves the database from the upstream schema.
- `table_name`: Retrieves the table name from the upstream schema.
- `rowtype_fields`: Retrieves all fields from the upstream schema and automatically maps them to Doris field descriptions.
- `rowtype_primary_key`: Retrieves the primary key from the upstream schema (this may be a list).
- `rowtype_unique_key`: Retrieves the unique key from the upstream schema (this may be a list).

## How to Create a Clickhouse Data Synchronization Jobs

The following example demonstrates how to create a data synchronization job that writes randomly generated data to a Clickhouse database:
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ source {

sink {
Assert {
source_table_name = hive_source
plugin_input = hive_source
rules {
row_rules = [
{
Expand Down
65 changes: 64 additions & 1 deletion docs/zh/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
| ARRAY | Array |
| MAP | Map |

## 输出选项
## Sink 选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|---------------------------------------|---------|------|-------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Expand All @@ -58,8 +58,71 @@
| primary_key | String | No | - | 标记`clickhouse`表中的主键列,并根据主键执行INSERT/UPDATE/DELETE到`clickhouse`表. |
| support_upsert | Boolean | No | false | 支持按查询主键更新插入行. |
| allow_experimental_lightweight_delete | Boolean | No | false | 允许基于`MergeTree`表引擎实验性轻量级删除. |
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的`schema_save_mode` |
| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式,请参考下面的`data_save_mode`|
| save_mode_create_template | string | no | see below | 见下文。 |
| common-options | | No | - | Sink插件查用参数,详见[Sink常用选项](../sink-common-options.md). |

### schema_save_mode[Enum]

在开启同步任务之前,针对现有的表结构选择不同的处理方案。
选项介绍:
`RECREATE_SCHEMA` :表不存在时创建,表保存时删除并重建。
`CREATE_SCHEMA_WHEN_NOT_EXIST` :表不存在时会创建,表存在时跳过。
`ERROR_WHEN_SCHEMA_NOT_EXIST` :表不存在时会报错。
`IGNORE` :忽略对表的处理。

### data_save_mode[Enum]

在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。
选项介绍:
`DROP_DATA`: 保留数据库结构并删除数据。
`APPEND_DATA`:保留数据库结构,保留数据。
`CUSTOM_PROCESSING`:用户自定义处理。
`ERROR_WHEN_DATA_EXISTS`:有数据时报错。

### save_mode_create_template

使用模板自动创建Doris表,
会根据上游数据类型和schema类型创建相应的建表语句,
默认模板可以根据情况进行修改。

默认模板:
```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE = MergeTree()
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
```

如果模板中填写了自定义字段,例如添加 id 字段

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
id,
${rowtype_fields}
) ENGINE = MergeTree()
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
```

连接器会自动从上游获取对应类型完成填充,
并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。

可以使用以下占位符:

- database:用于获取上游schema中的数据库。
- table_name:用于获取上游schema中的表名。
- rowtype_fields:用于获取上游schema中的所有字段,自动映射到Doris的字段描述。
- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)。
- rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。

## 如何创建一个clickhouse 同步任务

以下示例演示如何创建将随机生成的数据写入Clickhouse数据库的数据同步作业。
Expand Down
4 changes: 2 additions & 2 deletions docs/zh/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ source {
table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
metastore_uri = "thrift://metastore:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
result_table_name = hive_source
plugin_output = hive_source
hive_site_path = "/tmp/hive-site.xml"
kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM"
kerberos_keytab_path = "/tmp/hive.keytab"
Expand All @@ -197,7 +197,7 @@ source {

sink {
Assert {
source_table_name = hive_source
plugin_input = hive_source
rules {
row_rules = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
SOURCE_ALREADY_HAS_DATA("API-10", "The target data source already has data"),
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist"),
LIST_DATABASES_FAILED("API-12", "List databases failed"),
LIST_TABLES_FAILED("API-13", "List tables failed"),
GET_PRIMARY_KEY_FAILED("API-14", "Get primary key failed");

private final String code;
private final String description;
Expand Down
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
<version>${project.version}</version>
<classifier>optional</classifier>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

Expand Down
Loading

0 comments on commit e6f92fd

Please sign in to comment.