Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6b994af
newFeature: init oceanbase flink cdc pipeline connector
yuanoOo May 17, 2024
0647ecb
Enhancement: Refactor and improve each module
yuanoOo May 17, 2024
3c2a289
Merge branch 'refs/heads/master' into flink-cdc-pipeline-connector-oc…
yuanoOo May 17, 2024
294a537
Enhancement: add test for OceanBaseCatalogFactory
yuanoOo May 17, 2024
15067d2
Enhancement: add mysql to oceanbase end-2-end integrate test
yuanoOo May 23, 2024
b5d94bb
Merge branch 'apache:master' into flink-cdc-pipeline-connector-oceanbase
yuanoOo May 24, 2024
a6e7307
Enhancement: refine oceanbase catalog
yuanoOo May 24, 2024
23b4e1e
Merge remote-tracking branch 'origin/flink-cdc-pipeline-connector-oce…
yuanoOo May 24, 2024
2e3d199
Enhancement: Bump dependency of oceanbase flink connector to 1.2
yuanoOo May 24, 2024
85a7826
BugFix: add licence header
yuanoOo May 27, 2024
81b537c
Enhancement: add flink-cdc-pipeline-connector-oceanbase maven module …
yuanoOo May 27, 2024
9010854
Enhancement: refine test case
yuanoOo May 27, 2024
e9011b6
Merge branch 'apache:master' into flink-cdc-pipeline-connector-oceanbase
yuanoOo May 27, 2024
9d39771
Enhancement: Add validation to the applyAddColumnEvent method.
yuanoOo May 27, 2024
fa92312
Enhancement: Add a rename column test case to the integration tests.
yuanoOo May 27, 2024
581faa6
Enhancement: remove unnecessary dependencies.
yuanoOo May 28, 2024
e76f984
Enhancement: add doc for oceanbase-cdc-pipeline-connector
yuanoOo May 29, 2024
a1bd106
Enhancement: Improve documentation and integration tests.
yuanoOo May 30, 2024
14240b0
Merge branch 'master' into flink-cdc-pipeline-connector-oceanbase
yuanoOo Jun 12, 2024
75298b5
Enhancement: Refine code according to CR.
yuanoOo Jun 13, 2024
285d9c5
Enhancement: Remove unnecessary container startup in unit tests.
yuanoOo Jun 13, 2024
408f1a3
doc: Fix the incorrect description of driver in the documentation.
yuanoOo Jun 14, 2024
78ab2a8
Enhancement: Fix code according to CR.
yuanoOo Jun 17, 2024
02889e8
BugFix: git commit file: OptionUtils.
yuanoOo Jun 18, 2024
f4b497a
Merge branch 'apache:master' into flink-cdc-pipeline-connector-oceanbase
yuanoOo Jun 28, 2024
dbc2ae3
Merge branch 'refs/heads/master' into flink-cdc-pipeline-connector-oc…
yuanoOo Aug 19, 2024
aa3ecd4
Enhancement:Refactored the oceanbase container creation logic.
yuanoOo Aug 20, 2024
0b1dfe4
Merge branch 'refs/heads/master' into flink-cdc-pipeline-connector-oc…
yuanoOo Nov 8, 2024
0551f4f
Enhancement:Resolved conflicts.
yuanoOo Nov 8, 2024
0654604
Enhancement:Fix code.
yuanoOo Nov 18, 2024
93e6d5e
Enhancement:Fix code.
yuanoOo Nov 20, 2024
da6b501
Merge branch 'refs/heads/master' into flink-cdc-pipeline-connector-oc…
yuanoOo Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ env:
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks,\
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka,\
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon,\
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch"
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch,\
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase"

MODULES_MYSQL: "\
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc,\
Expand Down
349 changes: 349 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/oceanbase.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
---
title: "OceanBase"
weight: 7
type: docs
aliases:
- /connectors/pipeline-connectors/oceanbase
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# OceanBase Connector

OceanBase Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[OceanBase](https://github.com/oceanbase/oceanbase)。 本文档介绍如何设置 OceanBase Pipeline 连接器。

## 连接器的功能
* 自动建表
* 表结构变更同步
* 数据实时同步

## 示例

从 MySQL 读取数据同步到 OceanBase 的 Pipeline 可以定义如下:

```yaml
source:
type: mysql
hostname: mysql
port: 3306
username: mysqluser
password: mysqlpw
tables: mysql_2_oceanbase_test_17l13vc.\.*
server-id: 5400-5404
server-time-zone: UTC

sink:
type: oceanbase
url: jdbc:mysql://oceanbase:2881/test
username: root@test
password:

pipeline:
name: MySQL to OceanBase Pipeline
parallelism: 1
```

## 连接器配置项

<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th>参数名</th>
<th>是否必需</th>
<th>默认值</th>
<th>类型</th>
<th>描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>type</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>指定要使用的连接器, 这里需要设置成 <code>'oceanbase'</code>.</td>
</tr>
<tr>
<td>url</td>
<td>是</td>
<td></td>
<td>String</td>
<td>数据库的 JDBC url。</td>
</tr>
<tr>
<td>username</td>
<td>是</td>
<td></td>
<td>String</td>
<td>连接用户名。</td>
</tr>
<tr>
<td>password</td>
<td>是</td>
<td></td>
<td>String</td>
<td>连接密码。</td>
</tr>
<tr>
<td>schema-name</td>
<td>否</td>
<td></td>
<td>String</td>
<td>连接的 schema 名或 db 名。</td>
</tr>
<tr>
<td>table-name</td>
<td>否</td>
<td></td>
<td>String</td>
<td>表名。</td>
</tr>
<tr>
<td>driver-class-name</td>
<td>否</td>
<td>com.mysql.cj.jdbc.Driver</td>
<td>String</td>
<td>驱动类名,默认为 'com.mysql.cj.jdbc.Driver'。同时该connector并不包含对应驱动,需手动引入。</td>
</tr>
<tr>
<td>druid-properties</td>
<td>否</td>
<td></td>
<td>String</td>
<td>Druid 连接池属性,多个值用分号分隔。</td>
</tr>
<tr>
<td>sync-write</td>
<td>否</td>
<td>false</td>
<td>Boolean</td>
<td>是否开启同步写,设置为 true 时将不使用 buffer 直接写入数据库。</td>
</tr>
<tr>
<td>buffer-flush.interval</td>
<td>否</td>
<td>1s</td>
<td>Duration</td>
<td>缓冲区刷新周期。设置为 '0' 时将关闭定期刷新。</td>
</tr>
<tr>
<td>buffer-flush.buffer-size</td>
<td>否</td>
<td>1000</td>
<td>Integer</td>
<td>缓冲区大小。</td>
</tr>
<tr>
<td>max-retries</td>
<td>否</td>
<td>3</td>
<td>Integer</td>
<td>失败重试次数。</td>
</tr>
<tr>
<td>memstore-check.enabled</td>
<td>否</td>
<td>true</td>
<td>Boolean</td>
<td>是否开启内存检查。</td>
</tr>
<tr>
<td>memstore-check.threshold</td>
<td>否</td>
<td>0.9</td>
<td>Double</td>
<td>内存使用的阈值相对最大限制值的比例。</td>
</tr>
<tr>
<td>memstore-check.interval</td>
<td>否</td>
<td>30s</td>
<td>Duration</td>
<td>内存使用检查周期。</td>
</tr>
<tr>
<td>partition.enabled</td>
<td>否</td>
<td>false</td>
<td>Boolean</td>
<td>是否启用分区计算功能,按照分区来写数据。仅当 'sync-write' 和 'direct-load.enabled' 都为 false 时生效。</td>
</tr>
<tr>
<td>direct-load.enabled</td>
<td>否</td>
<td>false</td>
<td>Boolean</td>
<td>是否开启旁路导入。需要注意旁路导入需要将 sink 的并发度设置为1。</td>
</tr>
<tr>
<td>direct-load.host</td>
<td>否</td>
<td></td>
<td>String</td>
<td>旁路导入使用的域名或 IP 地址,开启旁路导入时为必填项。</td>
</tr>
<tr>
<td>direct-load.port</td>
<td>否</td>
<td>2882</td>
<td>Integer</td>
<td>旁路导入使用的 RPC 端口,开启旁路导入时为必填项。</td>
</tr>
<tr>
<td>direct-load.parallel</td>
<td>否</td>
<td>8</td>
<td>Integer</td>
<td>旁路导入任务的并发度。</td>
</tr>
<tr>
<td>direct-load.max-error-rows</td>
<td>否</td>
<td>0</td>
<td>Long</td>
<td>旁路导入任务最大可容忍的错误行数目。</td>
</tr>
<tr>
<td>direct-load.dup-action</td>
<td>否</td>
<td>REPLACE</td>
<td>STRING</td>
<td>旁路导入任务中主键重复时的处理策略。可以是 'STOP_ON_DUP'(本次导入失败),'REPLACE'(替换)或 'IGNORE'(忽略)。</td>
</tr>
<tr>
<td>direct-load.timeout</td>
<td>否</td>
<td>7d</td>
<td>Duration</td>
<td>旁路导入任务的超时时间。</td>
</tr>
<tr>
<td>direct-load.heartbeat-timeout</td>
<td>否</td>
<td>30s</td>
<td>Duration</td>
<td>旁路导入任务客户端的心跳超时时间。</td>
</tr>
</tbody>
</table>
</div>

## 使用说明

* 暂仅支持OceanBase的MySQL租户

* at-least-once语义保证,暂不支持 exactly-once

* 对于自动建表
* 没有分区键

* 对于表结构变更同步
* 暂只支持新增列、重命名列
* 新增列只能添加到最后一列

* 对于数据同步,pipeline 连接器使用 [OceanBase Sink 连接器](https://github.com/oceanbase/flink-connector-oceanbase)
将数据写入 OceanBase,具体可以参考 [Sink 文档](https://github.com/oceanbase/flink-connector-oceanbase/blob/main/docs/sink/flink-connector-oceanbase.md)。

## 数据类型映射
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">CDC type</th>
<th class="text-left">OceanBase type under MySQL tenant</th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
<tbody>
<tr>
<td>TINYINT</td>
<td>TINYINT</td>
<td></td>
</tr>
<tr>
<td>SMALLINT</td>
<td>SMALLINT</td>
<td></td>
</tr>
<tr>
<td>INT</td>
<td>INT</td>
<td></td>
</tr>
<tr>
<td>BIGINT</td>
<td>BIGINT</td>
<td></td>
</tr>
<tr>
<td>FLOAT</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td></td>
</tr>
<tr>
<td>DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
<td></td>
</tr>
<tr>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>DATE</td>
<td>DATE</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP</td>
<td>TIMESTAMP</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>TIMESTAMP</td>
<td></td>
</tr>
<tr>
<td>CHAR(n) where n <= 256</td>
<td>CHAR(n)</td>
<td></td>
</tr>
<tr>
<td>CHAR(n) where n > 256</td>
<td>VARCHAR(n)</td>
<td></td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n)</td>
<td></td>
</tr>
</tbody>
</table>
</div>

{{< top >}}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Flink CDC 提供了可用于 YAML 作业的 Pipeline Source 和 Sink 连接器
| [MySQL]({{< ref "docs/connectors/pipeline-connectors/mysql" >}}) | Source | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 |
| [Paimon]({{< ref "docs/connectors/pipeline-connectors/paimon" >}}) | Sink | <li> [Paimon](https://paimon.apache.org/): 0.6, 0.7, 0.8 |
| [StarRocks]({{< ref "docs/connectors/pipeline-connectors/starrocks" >}}) | Sink | <li> [StarRocks](https://www.starrocks.io/): 2.x, 3.x |
| [OceanBase]({{< ref "docs/connectors/pipeline-connectors/oceanbase" >}}) | Sink | <li> [OceanBase](https://www.oceanbase.com/): 3.x, 4.x |

## Develop Your Own Connector

Expand Down
Loading
Loading