Skip to content

Commit

Permalink
update document & fix source read bug(fields config is optional)
Browse files Browse the repository at this point in the history
  • Loading branch information
DESKTOP-GHPCOV0\dingaolong committed Oct 25, 2024
1 parent 32d1f76 commit 38d8011
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 13 deletions.
58 changes: 48 additions & 10 deletions docs/en/connector-v2/sink/TDengine.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@

Used to write data to TDengine. You need to create stable before running seatunnel task

***attention please***
+ you don't need to config sub-table name, first column read is used as sub-table name
+ we will use last n columns as table tags, n is count of tags

## Key features

- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
| name | type | required | default value |
|----------|--------|----------|---------------|
| url | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | |
| stable | string | yes | - |
| fields | list | no | - |
| timezone | string | no | UTC |

### url [string]
Expand Down Expand Up @@ -54,18 +59,51 @@ the timeznoe of the TDengine sever, it's important to the ts field

## Example

### sink
### write 3 columns into table power.meter

power.meter schema information:

```sql
CREATE STABLE `meter` (`ts` TIMESTAMP, `latitude` DOUBLE, `longtitude` DOUBLE) TAGS (`tenant_id` INT)
```
seatunnel config file

```hocon
source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
row.num = 1
schema = {
fields {
table_name = int
ts = timestamp
latitude = double
tenant_id = int
}
}
result_table_name = "fake"
}
}
sink {
TDengine {
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "power2"
stable : "meters2"
timezone: UTC
}
TDengine {
source_table_name='fake'
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "test"
fields: ['ts','latitude']
stable : "meter"
timezone: UTC
}
}
```
execute query and print table data
```sql
taos> select * from meter;
ts | latitude | longtitude | tenant_id |
================================================================================================
2024-03-08 03:47:54.000 | 9.594743198982019e+306 | NULL | 1215981593 |
2024-08-18 07:34:56.000 | 1.712320739538011e+308 | NULL | 1965348204 |
2024-07-05 21:59:04.000 | 4.499682197436227e+307 | NULL | 203469706 |

8 changes: 7 additions & 1 deletion docs/en/connector-v2/source/TDengine.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ supports query SQL and can achieve projection effect.

## Options

| name | type | required | default value |
| name | type | required | default value |
|-------------|--------|----------|---------------|
| url | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | |
| stable | string | yes | - |
| fields | list | no | - |
| lower_bound | long | yes | - |
| upper_bound | long | yes | - |

Expand Down Expand Up @@ -56,6 +57,10 @@ the database of the TDengine when you select

the stable of the TDengine when you select

### fields [list]

table columns selected from source table, such as: [ts,longtitude]

### lower_bound [long]

the lower_bound of the migration period
Expand All @@ -76,6 +81,7 @@ source {
password : "taosdata"
database : "power"
stable : "meters"
fields : [ts,longtitude]
lower_bound : "2018-10-03 14:38:05.000"
upper_bound : "2018-10-03 14:38:16.800"
result_table_name = "tdengine_result"
Expand Down
82 changes: 82 additions & 0 deletions docs/zh/connector-v2/sink/TDengine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# TDengine

> TDengine(Taos)数据接收器
## Description

连接TDengine(Taos)数据库并将数据写入数据库

***需要注意的点***

+ 写入tdengine时无需指定子表名称,会将reader中读取出来的第一个字段当作子表名称,假如数据源是一个jdbc任务,query语句是select device_id,ts,power,tenant_id from xxx,那么写入的子表名称将会是device_id的值。
+ 同理,写入时会将读出来的最后几列当作tag,假设写入的超级表指定了tag是tenant_id,那么在写入时会将tenant_id的值当作tag

## Key features

- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [x] [精确一次](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)

## 源选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|----------|--------|-------|---------------|----------------------------------------------------|
| url | string || - | 连接数据库使用的jdbc url,比如: jdbc:TAOS-RS://localhost:6041 |
| username | string || - | 连接数据库使用的用户名 |
| password | string || - | 连接数据库使用的密码 |
| database | string || | 数据写入的数据库名称 |
| stable | string || - | 数据写入的数据表名称 |
| fields | list || - | 写入的数据表字段 |
| timezone | string || - | 客户端时区 |

## 任务示例

### 往power.meters表中写入ts和longtitude两个字段的数据

power.meters表结构:

```sql
CREATE STABLE `meter` (`ts` TIMESTAMP, `latitude` DOUBLE, `longtitude` DOUBLE) TAGS (`tenant_id` INT)
```
seatunnel 配置信息
```hocon
source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
row.num = 1
schema = {
fields {
table_name = int
ts = timestamp
latitude = double
tenant_id = int
}
}
result_table_name = "fake"
}
}
sink {
TDengine {
source_table_name='fake'
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "test"
fields: ['ts','latitude']
stable : "meter"
timezone: UTC
}
}
```
查询tdengine数据库查看最新数据

```sql
taos> select * from meter;
ts | latitude | longtitude | tenant_id |
================================================================================================
2024-03-08 03:47:54.000 | 9.594743198982019e+306 | NULL | 1215981593 |
2024-08-18 07:34:56.000 | 1.712320739538011e+308 | NULL | 1965348204 |
2024-07-05 21:59:04.000 | 4.499682197436227e+307 | NULL | 203469706 |
```
48 changes: 48 additions & 0 deletions docs/zh/connector-v2/source/TDengine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# TDengine

> TDengine(Taos)数据源连接器
## Description

从TDengine(Taos)数据源读取数据,支持全表读取、指定列读取。

## Key features

- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [x] [精确一次](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)

## 源选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-------------|--------|-------|---------------|------------------------------------------------------------------------|
| url | string || - | 连接数据库使用的jdbc url,比如: jdbc:TAOS-RS://localhost:6041 |
| username | string || - | 连接数据库使用的用户名 |
| password | string || - | 连接数据库使用的密码 |
| database | string || | 数据源表所在的数据库名称 |
| stable | string || - | 数据源表的名称 |
| fields | list || - | 需要从数据源表中读取的字段 |
| lower_bound | long || - | 过滤条件。从数据源中读取数据时,数据的最早时间 |
| upper_bound | long || - | 过滤条件。从数据源中读取数据时,数据的最晚时间 |

## 任务示例

### 从power.meters表中读取ts和longtitude两个字段的数据,指定的数据时间范围是[2018-10-03 14:38:05.000,2018-10-03 14:38:16.800)

```hocon
source {
TDengine {
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "power"
stable : "meters"
fields : [ts,longtitude]
lower_bound : "2018-10-03 14:38:05.000"
upper_bound : "2018-10-03 14:38:16.800"
result_table_name = "tdengine_result"
}
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -100,12 +101,21 @@ public void write(SeaTunnelRow element) {
conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
String sql =
String.format(
"INSERT INTO %s using %s tags ( %s ) (%s) VALUES ( %s );",
"INSERT INTO %s using %s tags ( %s ) VALUES ( %s );",
element.getField(0),
config.getStable(),
tagValues,
String.join(",", config.getFields()),
StringUtils.join(convertDataType(metrics), ","));
if (CollectionUtils.isNotEmpty(config.getFields())) {
sql =
String.format(
"INSERT INTO %s using %s tags ( %s ) (%s) VALUES ( %s );",
element.getField(0),
config.getStable(),
tagValues,
String.join(",", config.getFields()),
StringUtils.join(convertDataType(metrics), ","));
}
log.debug("sql content: {}", sql);
final int rowCount = statement.executeUpdate(sql);
if (rowCount == 0) {
Expand Down

0 comments on commit 38d8011

Please sign in to comment.