diff --git a/releases/release-4.0.10.md b/releases/release-4.0.10.md index 5d902bff62e08..00695178ad91c 100644 --- a/releases/release-4.0.10.md +++ b/releases/release-4.0.10.md @@ -33,7 +33,6 @@ TiDB 版本:4.0.10 + TiCDC - - 为 `maxwell` 协议默认开启 old value 特性 [#1144](https://github.com/pingcap/tiflow/pull/1144) - 默认启用 unified sorter 特性 [#1230](https://github.com/pingcap/tiflow/pull/1230) + Dumpling @@ -82,7 +81,6 @@ TiDB 版本:4.0.10 + TiCDC - - 修复 `maxwell` 协议的问题,包括 `base64` 数据输出和将 TSO 转换成 unix timestamp [#1173](https://github.com/pingcap/tiflow/pull/1173) - 修复过期的元数据可能引发新创建的 changefeed 异常的问题 [#1184](https://github.com/pingcap/tiflow/pull/1184) - 修复在关闭的 notifier 上创建 receiver 的问题[#1199](https://github.com/pingcap/tiflow/pull/1199) - 修复在 etcd 更新缓慢时导致内存访问量增长的问题 [#1227](https://github.com/pingcap/tiflow/pull/1227) diff --git a/releases/release-4.0.6.md b/releases/release-4.0.6.md index a2f9f659540ea..9b64ad4641ea0 100644 --- a/releases/release-4.0.6.md +++ b/releases/release-4.0.6.md @@ -1,5 +1,9 @@ --- title: TiDB 4.0.6 Release Notes +<<<<<<< HEAD +======= +summary: TiDB 4.0.6 发布日期为 2020 年 9 月 15 日。新功能包括 TiFlash 中支持在广播 Join 中使用外连接,TiDB Dashboard 添加了多个页面和功能。TiCDC 从 v4.0.6 起成为正式功能。优化提升方面,TiDB 提升了分区表的写性能,支持调整 Union 执行算子的并发度等。Bug 修复方面,TiDB 修复了多个查询结果不正确的问题。TiKV 修复了统计信息估算错误的问题等。PD 修复了 store limit 的单位问题等。TiFlash 修复了多个启动失败和异常问题。Tools 方面也有多个问题得到解决。 +>>>>>>> 58f8d6b51b (cdc: remove maxwell protocol, add simple protocol (#18030) (#18171)) --- # TiDB 4.0.6 Release Notes @@ -26,8 +30,6 @@ TiDB 版本:4.0.6 + TiCDC(自 v4.0.6 起,TiCDC 成为**正式功能**,可用于生产环境) - - 支持输出 `maxwell` 格式的数据 [#869](https://github.com/pingcap/tiflow/pull/869) - ## 优化提升 + TiDB diff --git a/releases/release-4.0.8.md b/releases/release-4.0.8.md index 6dd50fcde2e26..4a0f9ea04787d 100644 --- a/releases/release-4.0.8.md +++ b/releases/release-4.0.8.md @@ -148,7 +148,6 @@ TiDB 版本:4.0.8 - 修复 owner 因更新 GC safepoint 失败而非预期退出的问题 [#979](https://github.com/pingcap/tiflow/pull/979) - 修复非预期的任务信息更新 [#1017](https://github.com/pingcap/tiflow/pull/1017) - - 修复非预期的空 Maxwell 消息 [#978](https://github.com/pingcap/tiflow/pull/978) + TiDB Lightning diff --git a/releases/release-5.0.6.md b/releases/release-5.0.6.md index 9da0042d1342d..c60613c2c4c06 100644 --- a/releases/release-5.0.6.md +++ b/releases/release-5.0.6.md @@ -149,7 +149,7 @@ TiDB 版本:5.0.6 - 修复 Avro sink 模块不支持解析 JSON 类型列的问题 [#3624](https://github.com/pingcap/tiflow/issues/3624) - 修复 TiKV owner 重启时 TiCDC 从 TiKV 读取到错误的元数据快照的问题 [#2603](https://github.com/pingcap/tiflow/issues/2603) - 修复执行 DDL 后的内存泄漏的问题 [#3174](https://github.com/pingcap/tiflow/issues/3174) - - 修复 Canal 和 Maxwell 协议下 TiCDC 没有自动开启 `enable-old-value` 选项的问题 [#3676](https://github.com/pingcap/tiflow/issues/3676) + - 修复 Canal 协议下 TiCDC 没有自动开启 `enable-old-value` 选项的问题 [#3676](https://github.com/pingcap/tiflow/issues/3676) - 修复 `cdc server` 命令在 Red Hat Enterprise Linux 系统的部分版本(如 6.8、6.9 等)上运行时出现时区错误的问题 [#3584](https://github.com/pingcap/tiflow/issues/3584) - 修复当 Kafka 为下游时 `txn_batch_size` 监控指标数据不准确的问题 [#3431](https://github.com/pingcap/tiflow/issues/3431) - 修复 tikv_cdc_min_resolved_ts_no_change_for_1m 监控在没有 changefeed 的情况下持续更新的问题 [#11017](https://github.com/tikv/tikv/issues/11017) diff --git a/releases/release-5.1.4.md b/releases/release-5.1.4.md index eec303f14452a..0d36a28c18fed 100644 --- a/releases/release-5.1.4.md +++ b/releases/release-5.1.4.md @@ -151,7 +151,7 @@ TiDB 版本:5.1.4 - 修复人为删除 etcd 任务的状态时导致 TiCDC panic 的问题 [#2980](https://github.com/pingcap/tiflow/issues/2980) - 修复在某些 RHEL 发行版上因时区问题导致服务无法启动的问题 [#3584](https://github.com/pingcap/tiflow/issues/3584) - 修复 MySQL sink 模块出现死锁时告警过于频繁的问题 [#2706](https://github.com/pingcap/tiflow/issues/2706) - - 修复 Canal 和 Maxwell 协议下 TiCDC 没有自动开启 `enable-old-value` 选项的问题 [#3676](https://github.com/pingcap/tiflow/issues/3676) + - 修复 Canal 协议下 TiCDC 没有自动开启 `enable-old-value` 选项的问题 [#3676](https://github.com/pingcap/tiflow/issues/3676) - 修复 Avro sink 模块不支持解析 JSON 类型列的问题 [#3624](https://github.com/pingcap/tiflow/issues/3624) - 修复监控 checkpoint lag 出现负值的问题 [#3010](https://github.com/pingcap/tiflow/issues/3010) - 修复在容器环境中 OOM 的问题 [#1798](https://github.com/pingcap/tiflow/issues/1798) diff --git a/releases/release-5.2.4.md b/releases/release-5.2.4.md index 93b3513ef8e02..47e1276137a38 100644 --- a/releases/release-5.2.4.md +++ b/releases/release-5.2.4.md @@ -181,7 +181,7 @@ TiDB 版本:5.2.4 - 修复在某些 RHEL 发行版上因时区问题导致服务无法启动的问题 [#3584](https://github.com/pingcap/tiflow/issues/3584) - 修复集群升级后 `stopped` 状态的 changefeed 自动恢复的问题 [#3473](https://github.com/pingcap/tiflow/issues/3473) - 修复 MySQL sink 模块出现死锁时告警过于频繁的问题 [#2706](https://github.com/pingcap/tiflow/issues/2706) - - 修复 Canal 和 Maxwell 协议下 TiCDC 没有自动开启 `enable-old-value` 选项的问题 [#3676](https://github.com/pingcap/tiflow/issues/3676) + - 修复 Canal 协议下 TiCDC 没有自动开启 `enable-old-value` 选项的问题 [#3676](https://github.com/pingcap/tiflow/issues/3676) - 修复 Avro sink 模块不支持解析 JSON 类型列的问题 [#3624](https://github.com/pingcap/tiflow/issues/3624) - 修复监控 checkpoint lag 出现负值的问题 [#3010](https://github.com/pingcap/tiflow/issues/3010) - 修复在容器环境中 OOM 的问题 [#1798](https://github.com/pingcap/tiflow/issues/1798) diff --git a/releases/release-5.3.1.md b/releases/release-5.3.1.md new file mode 100644 index 0000000000000..99d0d50d1b9d5 --- /dev/null +++ b/releases/release-5.3.1.md @@ -0,0 +1,165 @@ +--- +title: TiDB 5.3.1 Release Notes +summary: TiDB 5.3.1 发布日期为 2022 年 3 月 3 日。此版本包含兼容性更改、提升改进和 Bug 修复。兼容性更改包括 TiDB Lightning 工具的默认值调整。提升改进包括 TiDB、TiKV 和 PD 的优化。Bug 修复包括 TiDB、TiKV、PD、TiFlash、Backup & Restore (BR)、TiCDC、TiDB Data Migration (DM) 和 TiDB Lightning 工具的问题修复。 +--- + +# TiDB 5.3.1 Release Notes + +发版日期:2022 年 3 月 3 日 + +TiDB 版本:5.3.1 + +## 兼容性更改 + ++ Tools + + + TiDB Lightning + + - 将 `regionMaxKeyCount` 的默认值从 1_440_000 调整为 1_280_000,以避免数据导入后出现过多空 Region [#30018](https://github.com/pingcap/tidb/issues/30018) + +## 提升改进 + ++ TiDB + + - 优化用户登录模式匹配的逻辑,增强与 MySQL 的兼容性 [#32648](https://github.com/pingcap/tidb/issues/32648) + ++ TiKV + + - 通过减少需要进行清理锁 (Resolve Locks) 步骤的 Region 数量来减少 TiCDC 恢复时间 [#11993](https://github.com/tikv/tikv/issues/11993) + - 通过增加对 Raft log 进行垃圾回收 (GC) 时的 write batch 大小来加快 GC 速度 [#11404](https://github.com/tikv/tikv/issues/11404) + - 将 proc filesystem (procfs) 升级至 0.12.0 版本 [#11702](https://github.com/tikv/tikv/issues/11702) + +- PD + + - 优化 `DR_STATE` 文件内容的格式 [#4341](https://github.com/tikv/pd/issues/4341) + ++ Tools + + + TiCDC + + - 暴露 Kafka producer 配置参数,使之在 TiCDC 中可配置 [#4385](https://github.com/pingcap/tiflow/issues/4385) + - 当指定 S3 为存储后端时,在 TiCDC 启动前增加预清理逻辑 [#3878](https://github.com/pingcap/tiflow/issues/3878) + - TiCDC 客户端能在未指定证书名的情况下工作 [#3627](https://github.com/pingcap/tiflow/issues/3627) + - 修复因 checkpoint 不准确导致的潜在的数据丢失问题 [#3545](https://github.com/pingcap/tiflow/issues/3545) + - 为 changefeed 重启操作添加指数退避机制 [#3329](https://github.com/pingcap/tiflow/issues/3329) + - 将 Kafka Sink `partition-num` 的默认值改为 3,使 TiCDC 更加平均地分发消息到各个 Kafka partition [#3337](https://github.com/pingcap/tiflow/issues/3337) + - 减少 "EventFeed retry rate limited" 日志的数量 [#4006](https://github.com/pingcap/tiflow/issues/4006) + - 将 `max-message-bytes` 默认值设置为 10M [#4041](https://github.com/pingcap/tiflow/issues/4041) + - 增加更多 Prometheus 和 Grafana 监控告警参数,包括 `no owner alert`、`mounter row`、`table sink total row` 和 `buffer sink total row` [#4054](https://github.com/pingcap/tiflow/issues/4054) [#1606](https://github.com/pingcap/tiflow/issues/1606) + - 减少 TiKV 节点宕机后 KV client 恢复的时间 [#3191](https://github.com/pingcap/tiflow/issues/3191) + + - TiDB Lightning + + - 优化了本地磁盘空间检查失败时前置检查的提示信息 [#30395](https://github.com/pingcap/tidb/issues/30395) + +## Bug 修复 + ++ TiDB + + - 修复 date_format 对 `'\n'` 的处理与 MySQL 不兼容的问题 [#32232](https://github.com/pingcap/tidb/issues/32232) + - 修复 `alter column set default` 错误地修改表定义的问题 [#31074](https://github.com/pingcap/tidb/issues/31074) + - 修复开启 `tidb_restricted_read_only` 后 `tidb_super_read_only` 没有自动开启的问题 [#31745](https://github.com/pingcap/tidb/issues/31745) + - 修复带有 collation 的 `greatest` 或 `least` 函数结果出错的问题 [#31789](https://github.com/pingcap/tidb/issues/31789) + - 修复执行查询时报 MPP task list 为空错误的问题 [#31636](https://github.com/pingcap/tidb/issues/31636) + - 修复 innerWorker panic 导致的 index join 结果错误的问题 [#31494](https://github.com/pingcap/tidb/issues/31494) + - 修复将 `FLOAT` 列改为 `DOUBLE` 列后查询结果有误的问题 [#31372](https://github.com/pingcap/tidb/issues/31372) + - 修复查询时用到 index lookup join 导致 `invalid transaction` 报错的问题 [#30468](https://github.com/pingcap/tidb/issues/30468) + - 修复针对 `Order By` 的优化导致查询结果有误的问题 [#30271](https://github.com/pingcap/tidb/issues/30271) + - 修复 `MaxDays` 和 `MaxBackups` 配置项对慢日志不生效的问题 [#25716](https://github.com/pingcap/tidb/issues/25716) + - 修复 `INSERT ... SELECT ... ON DUPLICATE KEY UPDATE` 语句 panic 的问题 [#28078](https://github.com/pingcap/tidb/issues/28078) + ++ TiKV + + - 修复 Peer 状态为 Applying 时快照文件被删除会造成 Panic 的问题 [#11746](https://github.com/tikv/tikv/issues/11746) + - 修复开启流量控制且显式设置 `level0_slowdown_trigger` 时出现 QPS 下降的问题 [#11424](https://github.com/tikv/tikv/issues/11424) + - 修复 cgroup controller 没有被挂载会造成 Panic 的问题 [#11569](https://github.com/tikv/tikv/issues/11569) + - 修复 TiKV 停止后 Resolved TS 延迟会增加的问题 [#11351](https://github.com/tikv/tikv/issues/11351) + - 修复 GC worker 繁忙后无法执行范围删除(即执行 `unsafe_destroy_range` 参数)的问题 [#11903](https://github.com/tikv/tikv/issues/11903) + - 修复删除 Peer 可能造成高延迟的问题 [#10210](https://github.com/tikv/tikv/issues/10210) + - 修复 Region 没有数据时 `any_value` 函数结果错误的问题 [#11735](https://github.com/tikv/tikv/issues/11735) + - 修复删除未初始化的副本可能会造成旧副本被重新创建的问题 [#10533](https://github.com/tikv/tikv/issues/10533) + - 修复在已完成重新选举但没有通知被隔离的 Peer 的情况下执行 `Prepare Merge` 会导致元数据损坏的问题 [#11526](https://github.com/tikv/tikv/issues/11526) + - 修复协程的执行速度太快时偶尔出现的死锁问题 [#11549](https://github.com/tikv/tikv/issues/11549) + - 修复某个 TiKV 节点停机会导致 Resolved Timestamp 进度落后的问题 [#11351](https://github.com/tikv/tikv/issues/11351) + - 修复 Raft client 中 batch 消息过大的问题 [#9714](https://github.com/tikv/tikv/issues/9714) + - 修复在极端情况下同时进行 Region Merge、ConfChange 和 Snapshot 时,TiKV 会出现 Panic 的问题 [#11475](https://github.com/tikv/tikv/issues/11475) + - 修复逆序扫表时 TiKV 无法正确读到内存锁的问题 [#11440](https://github.com/tikv/tikv/issues/11440) + - 修复当达到磁盘容量满时 RocksDB flush 或 compaction 导致的 panic [#11224](https://github.com/tikv/tikv/issues/11224) + - 修复 tikv-ctl 无法正确输出 Region 相关信息的问题 [#11393](https://github.com/tikv/tikv/issues/11393) + - 修复 TiKV 监控项中实例级别 gRPC 的平均延迟时间不准确的问题 [#11299](https://github.com/tikv/tikv/issues/11299) + ++ PD + + - 修复特定情况下调度带有不需要的 JointConsensus 步骤的问题 [#4362](https://github.com/tikv/pd/issues/4362) + - 修复对单个 Voter 直接进行降级时无法执行调度的问题 [#4444](https://github.com/tikv/pd/issues/4444) + - 修复更新副本同步模式的配置时出现的数据竞争问题 [#4325](https://github.com/tikv/pd/issues/4325) + - 修复特定情况下读锁不释放的问题 [#4354](https://github.com/tikv/pd/issues/4354) + - 修复当 Region 心跳低于 60 秒时热点 Cache 不能清空的问题 [#4390](https://github.com/tikv/pd/issues/4390) + ++ TiFlash + + - 修复 `cast(arg as decimal(x,y))` 在入参 `arg` 大于 `decimal(x,y)` 的表示范围时,计算结果出错的问题 + - 修复开启 `max_memory_usage` 和 `max_memory_usage_for_all_queries` 配置项时,TiFlash 崩溃的问题 + - 修复 `cast(string as real)` 在部分场景下计算结果出错的问题 + - 修复 `cast(string as decimal)` 在部分场景下计算结果出错的问题 + - 修复在把主键列类型修改为一个范围更大的整型类型时,数据索引可能不一致的问题 + - 修复 `in` 表达式在形如 `(arg0, arg1) in (x,y)` 的多个参数的情况下计算结果出错的问题 + - 修复当 MPP 查询被终止时,TiFlash 偶发的崩溃问题 + - 修复 `str_to_date` 函数在入参以 0 开头时,计算结果出错的问题 + - 修复当查询的过滤条件形如 `where ` 时,计算结果出错的问题 + - 修复 `cast(string as datetime)` 在入参形如 `%Y-%m-%d\n%H:%i:%s` 时,查询结果出错的问题 + ++ Tools + + + Backup & Restore (BR) + + - 修复当恢复完成后,Region 有可能分布不均的问题 [#31034](https://github.com/pingcap/tidb/issues/31034) + + + TiCDC + + - 修复了 varchar 类型值长度过长时的 `Column length too big` 错误 [#4637](https://github.com/pingcap/tiflow/issues/4637) + - 修复了 TiCDC 进程在 PD leader 被杀死时的异常退出问题 [#4248](https://github.com/pingcap/tiflow/issues/4248) + - 修复了 UPDATE 语句在安全模式下执行错误会导致 DM 进程挂掉的问题 [#4317](https://github.com/pingcap/tiflow/issues/4317) + - 修复 `cached region` 监控指标为负数的问题 [#4300](https://github.com/pingcap/tiflow/issues/4300) + - 修复了 HTTP API 在查询的组件不存在时导致 CDC 挂掉的问题 [#3840](https://github.com/pingcap/tiflow/issues/3840) + - 修复了移除一个暂停的同步时,CDC Redo Log 无法被正确清理的问题 [#4740](https://github.com/pingcap/tiflow/issues/4740) + - 修复在容器环境中 OOM 的问题 [#1798](https://github.com/pingcap/tiflow/issues/1798) + - 修复了停止加载中的任务会导致它被意外调度的问题 [#3771](https://github.com/pingcap/tiflow/issues/3771) + - 纠正了在 Loader 上使用 `query-status` 命令查询到错误的进度的问题 [#3252](https://github.com/pingcap/tiflow/issues/3252) + - 修复了 HTTP API 在集群中存在不同版本 TiCDC 节点时无法正常工作的问题 [#3483](https://github.com/pingcap/tiflow/issues/3483) + - 修复了当 TiCDC Redo Log 配置在 S3 存储上时 TiCDC 异常退出问题 [#3523](https://github.com/pingcap/tiflow/issues/3523) + - 修复不支持同步默认值的问题 [#3793](https://github.com/pingcap/tiflow/issues/3793) + - 修复 MySQL sink 在禁用 `batch-replace-enable` 参数时生成重复 `replace` SQL 语句的错误 [#4501](https://github.com/pingcap/tiflow/issues/4501) + - 修复部分 syncer metrics 只有在查询状态时才得以更新的问题 [#4281](https://github.com/pingcap/tiflow/issues/4281) + - 修复当 Kafka 为下游时 `txn_batch_size` 监控指标数据不准确的问题 [#3431](https://github.com/pingcap/tiflow/issues/3431) + - 修复当 `min.insync.replicas` 小于 `replication-factor` 时不能同步的问题 [#3994](https://github.com/pingcap/tiflow/issues/3994) + - 修复当 Kafka 为下游时 `txn_batch_size` 监控指标数据不准确的问题 [#3431](https://github.com/pingcap/tiflow/issues/3431) + - 修复在移除同步任务后潜在的 panic 问题 [#3128](https://github.com/pingcap/tiflow/issues/3128) + - 修复潜在的同步流控死锁问题 [#4055](https://github.com/pingcap/tiflow/issues/4055) + - 修复人为删除 etcd 任务的状态时导致 TiCDC panic 的问题 [#2980](https://github.com/pingcap/tiflow/issues/2980) + - 修复 DDL 特殊注释导致的同步停止的问题 [#3755](https://github.com/pingcap/tiflow/issues/3755) + - Kafka sink 模块添加默认的元数据获取超时时间配置 (`config.Metadata.Timeout`) [#3352](https://github.com/pingcap/tiflow/issues/3352) + - 修复 `cdc server` 命令在 Red Hat Enterprise Linux 系统的部分版本(如 6.8、6.9 等)上运行时出现时区错误的问题 [#3584](https://github.com/pingcap/tiflow/issues/3584) + - 修复集群升级后 `stopped` 状态的 changefeed 自动恢复的问题 [#3473](https://github.com/pingcap/tiflow/issues/3473) + - 修复不支持同步默认值的问题 [#3793](https://github.com/pingcap/tiflow/issues/3793) + - 修复 MySQL sink 模块出现死锁时告警过于频繁的问题 [#2706](https://github.com/pingcap/tiflow/issues/2706) + - 修复 Canal 协议下 TiCDC 没有自动开启 `enable-old-value` 选项的问题 [#3676](https://github.com/pingcap/tiflow/issues/3676) + - 修复 Avro sink 模块不支持解析 JSON 类型列的问题 [#3624](https://github.com/pingcap/tiflow/issues/3624) + - 修复监控 checkpoint lag 出现负值的问题 [#3010](https://github.com/pingcap/tiflow/issues/3010) + + + TiDB Data Migration (DM) + + - 修复了 DM 的 master/worker 线程以特定顺序重启后中继状态错误的问题 [#3478](https://github.com/pingcap/tiflow/issues/3478) + - 修复了 DM worker 在重启后无法完成初始化的问题 [#3344](https://github.com/pingcap/tiflow/issues/3344) + - 修复了 DM 任务在分区表相关 DDL 执行时间过长时失败的问题 [#3854](https://github.com/pingcap/tiflow/issues/3854) + - 修复了 DM 在上游是 MySQL 8.0 时报错 "invalid sequence" 的问题 [#3847](https://github.com/pingcap/tiflow/issues/3847) + - 修复了 DM 采用细粒度失败重试策略导致数据丢失问题 [#3487](https://github.com/pingcap/tiflow/issues/3487) + - 修复 `CREATE VIEW` 语句中断复制任务的问题 [#4173](https://github.com/pingcap/tiflow/issues/4173) + - 修复 skip DDL 后需要重置 Schema 的问题 [#4177](https://github.com/pingcap/tiflow/issues/4177) + + + TiDB Lightning + + - 修复在某些导入操作没有包含源文件时,TiDB Lightning 不会删除 metadata schema 的问题 [#28144](https://github.com/pingcap/tidb/issues/28144) + - 修复存储 URL 前缀为 "gs://xxx" 而不是 "gcs://xxx" 时,TiDB Lightning 报错的问题 [#32742](https://github.com/pingcap/tidb/issues/32742) + - 修复设置 --log-file="-" 时,没有 log 输出到 stdout 的问题 [#29876](https://github.com/pingcap/tidb/issues/29876) + - 修复 S3 存储路径不存在时 TiDB Lightning 不报错的问题 [#30709](https://github.com/pingcap/tidb/issues/30709) diff --git a/ticdc/ticdc-open-api-v2.md b/ticdc/ticdc-open-api-v2.md new file mode 100644 index 0000000000000..d31c1af578a60 --- /dev/null +++ b/ticdc/ticdc-open-api-v2.md @@ -0,0 +1,1123 @@ +--- +title: TiCDC OpenAPI v2 +summary: 了解如何使用 OpenAPI v2 接口来管理集群状态和数据同步。 +--- + +# TiCDC OpenAPI v2 + +TiCDC 提供 OpenAPI 功能,你可以通过 OpenAPI v2 对 TiCDC 集群进行查询和运维操作。OpenAPI 的功能是 [`cdc cli` 工具](/ticdc/ticdc-manage-changefeed.md)的一个子集。 + +> **注意:** +> +> TiCDC OpenAPI v1 将在未来版本中被删除。推荐使用 TiCDC OpenAPI v2。 + +你可以通过 OpenAPI 完成 TiCDC 集群的如下运维操作: + +- [获取 TiCDC 节点状态信息](#获取-ticdc-节点状态信息) +- [检查 TiCDC 集群的健康状态](#检查-ticdc-集群的健康状态) +- [创建同步任务](#创建同步任务) +- [删除同步任务](#删除同步任务) +- [更新同步任务配置](#更新同步任务配置) +- [查询同步任务列表](#查询同步任务列表) +- [查询特定同步任务](#查询特定同步任务) +- [暂停同步任务](#暂停同步任务) +- [恢复同步任务](#恢复同步任务) +- [查询同步子任务列表](#查询同步子任务列表) +- [查询特定同步子任务](#查询特定同步子任务) +- [查询 TiCDC 服务进程列表](#查询-ticdc-服务进程列表) +- [驱逐 owner 节点](#驱逐-owner-节点) +- [动态调整 TiCDC Server 日志级别](#动态调整-ticdc-server-日志级别) + +所有 API 的请求体与返回值统一使用 JSON 格式数据。请求如果成功,则统一返回 `200 OK`。本文档以下部分描述当前提供的 API 的具体使用方法。 + +在下文的示例描述中,假设 TiCDC server 的监听 IP 地址为 `127.0.0.1`,端口为 `8300`。在启动 TiCDC server 时,可以通过 `--addr=ip:port` 指定 TiCDC 绑定的 IP 地址和端口。 + +## API 统一错误格式 + +对 API 发起请求后,如发生错误,返回错误信息的格式如下所示: + +```json +{ + "error_msg": "", + "error_code": "" +} +``` + +如上所示,`error_msg` 描述错误信息,`error_code` 则是对应的错误码。 + +## API List 接口统一返回格式 + +一个 API 请求如果返回是一个资源列表(例如,返回所有的服务进程 `Captures`),TiCDC 统一的返回格式如下: + +```json +{ + "total": 2, + "items": [ + { + "id": "d2912e63-3349-447c-90ba-wwww", + "is_owner": true, + "address": "127.0.0.1:8300" + }, + { + "id": "d2912e63-3349-447c-90ba-xxxx", + "is_owner": false, + "address": "127.0.0.1:8302" + } + ] +} +``` + +如上所示: + +- `total`: 表示有一共有多少个资源。 +- `items`: 当次请求返回的资源在这个数组里,数组所有的元素都是同种资源。 + +## 获取 TiCDC 节点状态信息 + +该接口是一个同步接口,请求成功会返回对应节点的状态信息。 + +### 请求 URI + +`GET /api/v2/status` + +### 使用样例 + +以下请求会获取 IP 地址为 `127.0.0.1`,端口号为 `8300` 的 TiCDC 节点的状态信息。 + +```shell +curl -X GET http://127.0.0.1:8300/api/v2/status +``` + +```json +{ + "version": "v7.5.2", + "git_hash": "10413bded1bdb2850aa6d7b94eb375102e9c44dc", + "id": "d2912e63-3349-447c-90ba-72a4e04b5e9e", + "pid": 1447, + "is_owner": true, + "liveness": 0 +} +``` + +以上返回信息的字段解释如下: + +- `version`:当前 TiCDC 版本号。 +- `git_hash`:Git 哈希值。 +- `id`:该节点的 capture ID。 +- `pid`:该节点 capture 进程的 PID。 +- `is_owner`:表示该节点是否是 owner。 +- `liveness`: 该节点是否在线。`0` 表示正常,`1` 表示处于 `graceful shutdown` 状态。 + +## 检查 TiCDC 集群的健康状态 + +该接口是一个同步接口,在集群健康的时候会返回 `200 OK`。 + +### 请求 URI + +`GET /api/v2/health` + +### 使用样例 + +```shell +curl -X GET http://127.0.0.1:8300/api/v2/health +``` + +如果集群健康,则返回 `200 OK` 和一个空的 json {}: + +```json +{} +``` + +如果集群不健康,则返回错误信息。 + +## 创建同步任务 + +该接口用于向 TiCDC 提交一个同步任务,请求成功会返回 `200 OK`。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。 + +### 请求 URI + +`POST /api/v2/changefeeds` + +### 参数说明 + +```json +{ + "changefeed_id": "string", + "replica_config": { + "bdr_mode": true, + "case_sensitive": false, + "check_gc_safe_point": true, + "consistent": { + "flush_interval": 0, + "level": "string", + "max_log_size": 0, + "storage": "string" + }, + "enable_old_value": true, + "enable_sync_point": true, + "filter": { + "do_dbs": [ + "string" + ], + "do_tables": [ + { + "database_name": "string", + "table_name": "string" + } + ], + "event_filters": [ + { + "ignore_delete_value_expr": "string", + "ignore_event": [ + "string" + ], + "ignore_insert_value_expr": "string", + "ignore_sql": [ + "string" + ], + "ignore_update_new_value_expr": "string", + "ignore_update_old_value_expr": "string", + "matcher": [ + "string" + ] + } + ], + "ignore_dbs": [ + "string" + ], + "ignore_tables": [ + { + "database_name": "string", + "table_name": "string" + } + ], + "ignore_txn_start_ts": [ + 0 + ], + "rules": [ + "string" + ] + }, + "force_replicate": true, + "ignore_ineligible_table": true, + "memory_quota": 0, + "mounter": { + "worker_num": 0 + }, + "sink": { + "column_selectors": [ + { + "columns": [ + "string" + ], + "matcher": [ + "string" + ] + } + ], + "csv": { + "delimiter": "string", + "include_commit_ts": true, + "null": "string", + "quote": "string" + }, + "date_separator": "string", + "dispatchers": [ + { + "matcher": [ + "string" + ], + "partition": "string", + "topic": "string" + } + ], + "enable_partition_separator": true, + "encoder_concurrency": 0, + "protocol": "string", + "schema_registry": "string", + "terminator": "string", + "transaction_atomicity": "string" + }, + "sync_point_interval": "string", + "sync_point_retention": "string" + }, + "sink_uri": "string", + "start_ts": 0, + "target_ts": 0 +} +``` + +参数说明如下: + +| 参数名 | 说明 | +|:-----------------|:------------------------------------------------------------------------------------| +| `changefeed_id` | `STRING` 类型,同步任务的 ID。(非必选) | +| `replica_config` | 同步任务的配置参数。(非必选) | +| **`sink_uri`** | `STRING` 类型,同步任务下游的地址。(**必选**) | +| `start_ts` | `UINT64` 类型,指定 changefeed 的开始 TSO。TiCDC 集群将从这个 TSO 开始拉取数据。默认为当前时间。(非必选) | +| `target_ts` | `UINT64` 类型,指定 changefeed 的目标 TSO。达到这个 TSO 后,TiCDC 集群将停止拉取数据。默认为空,即 TiCDC 不会自动停止。(非必选) | + +`changefeed_id`、`start_ts`、`target_ts`、`sink_uri` 的含义和格式与[使用 cli 创建同步任务](/ticdc/ticdc-manage-changefeed.md#创建同步任务)中所作的解释相同,具体解释请参见该文档。需要注意,当在 `sink_uri` 中指定证书的路径时,须确保已将对应证书上传到对应的 TiCDC server 上。 + +`replica_config` 参数说明如下: + +| 参数名 | 说明 | +|:--------------------------|:---------------------------------------------------------------------------------------------------------| +| `bdr_mode` | `BOOLEAN` 类型,是否开启[双向同步复制](/ticdc/ticdc-bidirectional-replication.md)。默认值为 `false`。(非必选) | +| `case_sensitive` | `BOOLEAN` 类型,过滤表名时大小写是否敏感。自 v7.5.0 起,默认值由 `true` 改为 `false`。(非必选) | +| `check_gc_safe_point` | `BOOLEAN` 类型,是否检查同步任务的开始时间早于 GC 时间,默认值为 `true`。(非必选) | +| `consistent` | Redo log 配置。(非必选) | +| `enable_sync_point` | `BOOLEAN` 类型,是否开启 `sync point` 功能。(非必选) | +| `filter` | filter 配置。(非必选) | +| `force_replicate` | `BOOLEAN` 类型,该值默认为 `false`,当指定为 `true` 时,同步任务会尝试强制同步没有唯一索引的表。(非必选) | +| `ignore_ineligible_table` | `BOOLEAN` 类型,该值默认为 `false`,当指定为 `true` 时,同步任务会忽略无法进行同步的表。(非必选) | +| `memory_quota` | `UINT64` 类型,同步任务的内存 quota。(非必选) | +| `mounter` | 同步任务 `mounter` 配置。(非必选) | +| `sink` | 同步任务的`sink`配置。(非必选) | +| `sync_point_interval` | `STRING` 类型,注意返回值为 `UINT64` 类型的纳秒级时间,`sync point` 功能开启时,对齐上下游 snapshot 的时间间隔。默认值为 `10m`,最小值为 `30s`。(非必选) | +| `sync_point_retention` | `STRING` 类型,注意返回值为 `UINT64` 类型的纳秒级时间,`sync point` 功能开启时,在下游表中保存的数据的时长,超过这个时间的数据会被清理。默认值为 `24h`。(非必选) | + +`consistent` 参数说明如下: + +| 参数名 | 说明 | +|:-----------------|:---------------------------------------| +| `flush_interval` | `UINT64` 类型,redo log 文件 flush 间隔。(非必选) | +| `level` | `STRING` 类型,同步数据的一致性级别。(非必选) | +| `max_log_size` | `UINT64` 类型,redo log 的最大值。(非必选) | +| `storage` | `STRING` 类型,存储的目标地址。(非必选) | +| `use_file_backend` | `BOOL` 类型,是否将 redo log 存储到本地文件中。(非必选) | +| `encoding_worker_num` | `INT` 类型,redo 模块中编解码 worker 的数量。(非必选)| +|`flush_worker_num` | `INT` 类型,redo 模块中上传文件 worker 的数量。(非必选) | +| `compression` | `STRING` 类型,redo 文件的压缩行为,可选值为 `""` 和 `"lz4"`。默认值为 `""`,表示不进行压缩。(非必选) | +| `flush_concurrency` | `INT` 类型,redo log 上传单个文件的并发数,默认值为 1,表示禁用并发。(非必选) | + +`filter` 参数说明如下: + +| 参数名 | 说明 | +|:----------------------|:--------------------------------------------------------------------------------------| +| `do_dbs` | `STRING ARRAY` 类型,需要同步的数据库。(非必选) | +| `do_tables` | 需要同步的表。(非必选) | +| `ignore_dbs` | `STRING ARRAY` 类型,不同步的数据库。(非必选) | +| `ignore_tables` | 不同步的表。(非必选) | +| `event_filters` | event 过滤配置。(非必选) | +| `ignore_txn_start_ts` | `UINT64 ARRAY` 类型,指定之后会忽略指定 `start_ts` 的事务,如 `[1, 2]`。(非必选) | +| `rules` | `STRING ARRAY` 类型,表库过滤的规则,如 `['foo*.*', 'bar*.*']`。详情请参考[表库过滤](/table-filter.md)。(非必选) | + +`filter.event_filters` 参数说明如下,可参考[日志过滤器](/ticdc/ticdc-filter.md)。 + +| 参数名 | 说明 | +|:-------------------------------|:--------------------------------------------------------------------------------------------| +| `ignore_delete_value_expr` | `STRING ARRAY` 类型,如 `"name = 'john'"` 表示过滤掉包含 `name = 'john'` 条件的 DELETE DML。(非必选) | +| `ignore_event` | `STRING ARRAY` 类型,如 `["insert"]` 表示过滤掉 INSERT 事件。(非必选) | +| `ignore_insert_value_expr` | `STRING ARRAY` 类型,如 `"id >= 100"` 表示过滤掉包含 `id >= 100` 条件的 INSERT DML。(非必选) | +| `ignore_sql` | `STRING ARRAY` 类型,如 `["^drop", "add column"]` 表示过滤掉以 `DROP` 开头或者包含 `ADD COLUMN` 的 DDL。(非必选) | +| `ignore_update_new_value_expr` | `STRING ARRAY` 类型,如 `"gender = 'male'"` 表示过滤掉新值 `gender = 'male'` 的 UPDATE DML。(非必选) | +| `ignore_update_old_value_expr` | `STRING ARRAY` 类型,如 `"age < 18"` 表示过滤掉旧值 `age < 18` 的 UPDATE DML。(非必选) | +| `matcher` | `STRING ARRAY` 类型,是一个白名单,如 `["test.worker"]`,表示该过滤规则只应用于 `test` 库中的 `worker` 表。(非必选) | + +`mounter` 参数说明如下: + +| 参数名 | 说明 | +|:-------------|:-----------------------------------------------------------| +| `worker_num` | `INT` 类型。Mounter 线程数,Mounter 用于解码 TiKV 输出的数据,默认值为 `16`。(非必选) | + +`sink` 参数说明如下: + +| 参数名 | 说明 | +|:------------------------------|:---------------------------------------------------------------------------------------------------| +| `column_selectors` | column selector 配置。(非必选) | +| `csv` | CSV 配置。(非必选) | +| `date_separator` | `STRING` 类型,文件路径的日期分隔类型。可选类型有 `none`、`year`、`month` 和 `day`。默认值为 `none`,即不使用日期分隔。(非必选) | +| `dispatchers` | 事件分发配置数组。(非必选) | +| `encoder_concurrency` | `INT` 类型。MQ sink 中编码器的线程数。默认值为 `16`。(非必选) | +| `protocol` | `STRING` 类型,对于 MQ 类的 Sink,可以指定消息的协议格式。目前支持以下协议:`canal-json`、`open-protocol` 和 `avro`。 | +| `schema_registry` | `STRING` 类型,schema registry 地址。(非必选) | +| `terminator` | `STRING` 类型,换行符,用来分隔两个数据变更事件。默认值为空,表示使用 `"\r\n"` 作为换行符。(非必选) | +| `transaction_atomicity` | `STRING` 类型,事务一致性等级。(非必选) | +| `only_output_updated_columns` | `BOOLEAN` 类型,对于 MQ 类型的 Sink 中的 `canal-json` 和 `open-protocol`,表示是否只向下游同步有内容更新的列。默认值为 `false`。(非必选) | +| `cloud_storage_config` | storage sink 配置。(非必选) | +| `open` | Open Protocol 配置,从 v7.5.2 开始支持。(非必选) | + +`sink.column_selectors` 是一个数组,元素参数说明如下: + +| 参数名 | 说明 | +|:----------|:------------------------------------------| +| `columns` | `STRING ARRAY` 类型,column 数组。 | +| `matcher` | `STRING ARRAY` 类型,matcher 配置,匹配语法和过滤器规则的语法相同。 | + +`sink.csv` 参数说明如下: + +| 参数名 | 说明 | +|:-------------------------|:------------------------------------------------| +| `delimiter` | `STRING` 类型,字段之间的分隔符。必须为 ASCII 字符,默认值为 `,`。 | +| `include_commit_ts` | `BOOLEAN` 类型,是否在 CSV 行中包含 commit-ts。默认值为 `false`。 | +| `null` | `STRING` 类型,如果这一列是 null,那这一列该如何表示。默认是用 `\N` 来表示。 | +| `quote` | `STRING` 类型,用于包裹字段的引号字符。空值代表不使用引号字符。默认值为 `"`。 | +| `binary_encoding_method` | `STRING` 类型,二进制类型数据的编码方式,可选 `"base64"` 或 `"hex"`。默认值为 `"base64"`。 | + +`sink.dispatchers`:对于 MQ 类的 Sink,可以通过该参数配置 event 分发器,支持以下分发器:`default`、`ts`、`index-value`、`table` 。分发规则如下: + +- `default`:按照 table 分发。 +- `ts`:以行变更的 commitTs 做 Hash 计算并进行 event 分发。 +- `index-value`:以所选的 HandleKey 列名和列值做 Hash 计算并进行 event 分发。 +- `table`:以表的 schema 名和 table 名做 Hash 计算并进行 event 分发。 + +`sink.dispatchers` 是一个数组,元素参数说明如下: + +| 参数名 | 说明 | +|:------------|:----------------------------------| +| `matcher` | `STRING ARRAY` 类型,匹配语法和过滤器规则的语法相同。 | +| `partition` | `STRING` 类型,事件分发的目标 partition。 | +| `topic` | `STRING` 类型,事件分发的目标 topic。 | + +`sink.cloud_storage_config` 参数说明如下: + +| 参数名 | 说明 | +|:------------|:----------------------------------| +| `worker_count` | `INT` 类型,向下游存储服务保存数据变更记录的并发度。 | +| `flush_interval` | `STRING` 类型,向下游存储服务保存数据变更记录的间隔。 | +| `file_size` | `INT` 类型,单个数据变更文件的字节数超过 `file-size` 时将其保存至存储服务中。 | +| `file_expiration_days` | `INT` 类型,文件保留的时长。| +| `file_cleanup_cron_spec` | `STRING` 类型,定时清理任务的运行周期,与 crontab 配置兼容,格式为 ` `。| +| `flush_concurrency` | `INT` 类型,上传单个文件的并发数。| +| `output_raw_change_event` | `BOOLEAN` 类型,控制使用非 MySQL Sink 时是否输出原始的数据变更事件。| + +`sink.open` 参数说明如下: + +| 参数名 | 说明 | +|:-------------------|:--------------------------------------------------------------| +| `output_old_value` | `BOOLEAN` 类型,是否输出行数据更改前的值。默认值为 `true`。关闭后,Update 事件不会输出 "p" 字段的数据。 | + +### 使用样例 + +以下请求会创建一个 ID 为 `test5`,sink_uri 为 `blackhole://` 的同步任务。 + +```shell +curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v2/changefeeds -d '{"changefeed_id":"test5","sink_uri":"blackhole://"}' +``` + +如果请求成功,则返回 `200 OK`。如果请求失败,则返回错误信息和错误码。 + +### 响应体格式 + +```json +{ + "admin_job_type": 0, + "checkpoint_time": "string", + "checkpoint_ts": 0, + "config": { + "bdr_mode": true, + "case_sensitive": false, + "check_gc_safe_point": true, + "consistent": { + "flush_interval": 0, + "level": "string", + "max_log_size": 0, + "storage": "string" + }, + "enable_old_value": true, + "enable_sync_point": true, + "filter": { + "do_dbs": [ + "string" + ], + "do_tables": [ + { + "database_name": "string", + "table_name": "string" + } + ], + "event_filters": [ + { + "ignore_delete_value_expr": "string", + "ignore_event": [ + "string" + ], + "ignore_insert_value_expr": "string", + "ignore_sql": [ + "string" + ], + "ignore_update_new_value_expr": "string", + "ignore_update_old_value_expr": "string", + "matcher": [ + "string" + ] + } + ], + "ignore_dbs": [ + "string" + ], + "ignore_tables": [ + { + "database_name": "string", + "table_name": "string" + } + ], + "ignore_txn_start_ts": [ + 0 + ], + "rules": [ + "string" + ] + }, + "force_replicate": true, + "ignore_ineligible_table": true, + "memory_quota": 0, + "mounter": { + "worker_num": 0 + }, + "sink": { + "column_selectors": [ + { + "columns": [ + "string" + ], + "matcher": [ + "string" + ] + } + ], + "csv": { + "delimiter": "string", + "include_commit_ts": true, + "null": "string", + "quote": "string" + }, + "date_separator": "string", + "dispatchers": [ + { + "matcher": [ + "string" + ], + "partition": "string", + "topic": "string" + } + ], + "enable_partition_separator": true, + "encoder_concurrency": 0, + "protocol": "string", + "schema_registry": "string", + "terminator": "string", + "transaction_atomicity": "string" + }, + "sync_point_interval": "string", + "sync_point_retention": "string" + }, + "create_time": "string", + "creator_version": "string", + "error": { + "addr": "string", + "code": "string", + "message": "string" + }, + "id": "string", + "resolved_ts": 0, + "sink_uri": "string", + "start_ts": 0, + "state": "string", + "target_ts": 0, + "task_status": [ + { + "capture_id": "string", + "table_ids": [ + 0 + ] + } + ] +} +``` + +参数说明如下: + +| 参数名 | 说明 | +|:------------------|:---------------------------------------------------------------| +| `admin_job_type` | `INTEGER` 类型,admin 事件类型 | +| `checkpoint_time` | `STRING` 类型,同步任务当前 checkpoint 的格式化时间表示 | +| `checkpoint_ts` | `STRING` 类型,同步任务当前 checkpoint 的 TSO 表示 | +| `config` | 同步任务配置,结构和含义与创建同步任务中的 `replica_config` 配置项相同 | +| `create_time` | `STRING` 类型,同步任务创建的时间 | +| `creator_version` | `STRING` 类型,同步任务创建时 TiCDC 的版本 | +| `error` | 同步任务错误 | +| `id` | `STRING` 类型,同步任务 ID | +| `resolved_ts` | `UINT64` 类型,同步任务 resolved ts | +| `sink_uri` | `STRING` 类型,同步任务的 sink uri | +| `start_ts` | `UINT64` 类型,同步任务 start ts | +| `state` | `STRING` 类型,同步任务状态,状态可分为 `normal`、`stopped`、`error`、`failed`、`finished` | +| `target_ts` | `UINT64` 类型,同步任务的 target ts | +| `task_status` | 同步任务分发的详细状态 | + +`task_status` 参数说明如下: + +| 参数名 | 说明 | +|:-------------|:--------------------------------------------| +| `capture_id` | `STRING` 类型,`Capture` ID | +| `table_ids` | `UINT64 ARRAY` 类型,该 Capture 上正在同步的 table 的 ID | + +`error` 参数说明如下: + +| 参数名 | 说明 | +|:-------|:-------------------------| +| `addr` | `STRING` 类型,`Capture` 地址 | +| `code` | `STRING` 类型,错误码 | +| `message` | `STRING` 类型,错误的详细信息 | + +## 删除同步任务 + +该接口是幂等的(即其任意多次执行所产生的影响均与一次执行的影响相同),用于删除一个 changefeed 同步任务,请求成功会返回 `200 OK`。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。 + +### 请求 URI + +`DELETE /api/v2/changefeeds/{changefeed_id}` + +### 参数说明 + +#### 路径参数 + +| 参数名 | 说明 | +|:----------------|:----------------------------| +| `changefeed_id` | 需要删除的同步任务 (changefeed) 的 ID | + +### 使用样例 + +以下请求会删除 ID 为 `test1` 的同步任务。 + +```shell +curl -X DELETE http://127.0.0.1:8300/api/v2/changefeeds/test1 +``` + +如果请求成功,则返回 `200 OK`。如果请求失败,则返回错误信息和错误码。 + +## 更新同步任务配置 + +该接口用于更新一个同步任务,请求成功会返回 `200 OK`。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。 + +修改 changefeed 配置需要按照`暂停任务 -> 修改配置 -> 恢复任务`的流程。 + +### 请求 URI + +`PUT /api/v2/changefeeds/{changefeed_id}` + +### 参数说明 + +#### 路径参数 + +| 参数名 | 说明 | +|:----------------|:----------------------------| +| `changefeed_id` | 需要更新的同步任务 (changefeed) 的 ID | + +#### 请求体参数 + +```json +{ + "replica_config": { + "bdr_mode": true, + "case_sensitive": false, + "check_gc_safe_point": true, + "consistent": { + "flush_interval": 0, + "level": "string", + "max_log_size": 0, + "storage": "string" + }, + "enable_old_value": true, + "enable_sync_point": true, + "filter": { + "do_dbs": [ + "string" + ], + "do_tables": [ + { + "database_name": "string", + "table_name": "string" + } + ], + "event_filters": [ + { + "ignore_delete_value_expr": "string", + "ignore_event": [ + "string" + ], + "ignore_insert_value_expr": "string", + "ignore_sql": [ + "string" + ], + "ignore_update_new_value_expr": "string", + "ignore_update_old_value_expr": "string", + "matcher": [ + "string" + ] + } + ], + "ignore_dbs": [ + "string" + ], + "ignore_tables": [ + { + "database_name": "string", + "table_name": "string" + } + ], + "ignore_txn_start_ts": [ + 0 + ], + "rules": [ + "string" + ] + }, + "force_replicate": true, + "ignore_ineligible_table": true, + "memory_quota": 0, + "mounter": { + "worker_num": 0 + }, + "sink": { + "column_selectors": [ + { + "columns": [ + "string" + ], + "matcher": [ + "string" + ] + } + ], + "csv": { + "delimiter": "string", + "include_commit_ts": true, + "null": "string", + "quote": "string" + }, + "date_separator": "string", + "dispatchers": [ + { + "matcher": [ + "string" + ], + "partition": "string", + "topic": "string" + } + ], + "enable_partition_separator": true, + "encoder_concurrency": 0, + "protocol": "string", + "schema_registry": "string", + "terminator": "string", + "transaction_atomicity": "string" + }, + "sync_point_interval": "string", + "sync_point_retention": "string" + }, + "sink_uri": "string", + "target_ts": 0 +} +``` + +目前仅支持通过 API 修改同步任务的如下配置。 + +| 参数名 | 说明 | +|:-----------------|:----------------------------------------| +| `target_ts` | `UINT64` 类型,指定 changefeed 的目标 TSO。(非必选) | +| `sink_uri` | `STRING` 类型,同步任务下游的地址。(非必选) | +| `replica_config` | sink 的配置参数, 必须是完整的配置。(非必选) | + +以上参数含义与[创建同步任务](#创建同步任务)中的参数相同,此处不再赘述。 + +### 使用样例 + +以下请求会更新 ID 为 `test1` 的同步任务的 `target_ts` 为 `32`。 + +```shell + curl -X PUT -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v2/changefeeds/test1 -d '{"target_ts":32}' +``` + +若是请求成功,则返回 `200 OK`,若请求失败,则返回错误信息和错误码。响应的 JSON 格式以及字段含义与[创建同步任务](#创建同步任务)中的响应参数相同,此处不再赘述。 + +## 查询同步任务列表 + +该接口是一个同步接口,请求成功会返回 TiCDC 集群中所有同步任务 (changefeed) 的基本信息。 + +### 请求 URI + +`GET /api/v2/changefeeds` + +### 参数说明 + +#### 查询参数 + +| 参数名 | 说明 | +|:--------|:------------------------| +| `state` | 非必选,指定后将会只返回该状态的同步任务的信息 | + +`state` 可选值为 `all`、`normal`、`stopped`、`error`、`failed`、`finished`。 + +若不指定该参数,则默认返回处于 `normal`、`stopped`、`failed` 状态的同步任务基本信息。 + +### 使用样例 + +以下请求查询所有状态 (state) 为 `normal` 的同步任务的基本信息。 + +```shell +curl -X GET http://127.0.0.1:8300/api/v2/changefeeds?state=normal +``` + +```json +{ + "total": 2, + "items": [ + { + "id": "test", + "state": "normal", + "checkpoint_tso": 439749918821711874, + "checkpoint_time": "2023-02-27 23:46:52.888", + "error": null + }, + { + "id": "test2", + "state": "normal", + "checkpoint_tso": 439749918821711874, + "checkpoint_time": "2023-02-27 23:46:52.888", + "error": null + } + ] +} +``` + +以上返回的信息的说明如下: + +- `id`:同步任务的 ID。 +- `state`:同步任务当前所处的[状态](/ticdc/ticdc-changefeed-overview.md#changefeed-状态流转)。 +- `checkpoint_tso`:同步任务当前 checkpoint 的 TSO 表示。 +- `checkpoint_time`:同步任务当前 checkpoint 的格式化时间表示。 +- `error`:同步任务的错误信息。 + +## 查询特定同步任务 + +该接口是一个同步接口,请求成功会返回指定同步任务 (changefeed) 的详细信息。 + +### 请求 URI + +`GET /api/v2/changefeeds/{changefeed_id}` + +### 参数说明 + +#### 路径参数 + +| 参数名 | 说明 | +|:----------------|:----------------------------| +| `changefeed_id` | 需要查询的同步任务 (changefeed) 的 ID | + +### 使用样例 + +以下请求会查询 ID 为 `test1` 的同步任务的详细信息。 + +```shell +curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test1 +``` + +响应的 JSON 格式以及字段含义与[创建同步任务](#创建同步任务)中的响应参数相同,此处不再赘述。 + +## 查询特定同步任务是否完成 + +该接口是一个同步接口,请求成功后会返回指定同步任务 (changefeed) 的同步完成情况,包括是否同步完成,以及一些更详细的信息。 + +### 请求 URI + +`GET /api/v2/changefeed/{changefeed_id}/synced` + +### 参数说明 + +#### 路径参数 + +| 参数名 | 说明 | +|:----------------|:----------------------------| +| `changefeed_id` | 需要查询的同步任务 (changefeed) 的 ID | + +### 使用样例 + +以下请求会查询 ID 为 `test1` 的同步任务的同步完成状态。 + +```shell +curl -X GET http://127.0.0.1:8300/api/v2/changefeed/test1/synced +``` + +**示例 1:同步已完成** + +```json +{ + "synced": true, + "sink_checkpoint_ts": "2023-11-30 15:14:11.015", + "puller_resolved_ts": "2023-11-30 15:14:12.215", + "last_synced_ts": "2023-11-30 15:08:35.510", + "now_ts": "2023-11-30 15:14:11.511", + "info": "Data syncing is finished" +} +``` + +以上返回的信息的说明如下: + +- `synced`:该同步任务是否已完成。`true` 表示已完成;`false` 表示不一定完成,具体状态需要结合 `info` 字段以及其他字段进行判断。 +- `sink_checkpoint_ts`:sink 模块的 checkpoint-ts 值,时间为 PD 时间。 +- `puller_resolved_ts`:puller 模块的 resolved-ts 值,时间为 PD 时间。 +- `last_synced_ts`:TiCDC 处理的最新一条数据的 commit-ts 值,时间为 PD 时间。 +- `now_ts`:当前的 PD 时间。 +- `info`:一些帮助判断同步状态的信息,特别是在 `synced` 为 `false` 时可以为你提供参考。 + +**示例 2:同步未完成** + +```json +{ + "synced": false, + "sink_checkpoint_ts": "2023-11-30 15:26:31.519", + "puller_resolved_ts": "2023-11-30 15:26:23.525", + "last_synced_ts": "2023-11-30 15:24:30.115", + "now_ts": "2023-11-30 15:26:31.511", + "info": "The data syncing is not finished, please wait" +} +``` + +此示例展示了当未完成同步任务时该接口返回的查询结果。你可以结合 `synced` 和 `info` 字段判断出数据目前还未完成同步,需要继续等待。 + +**示例 3:同步状态需要进一步判断** + +```json +{ + "synced":false, + "sink_checkpoint_ts":"2023-12-13 11:45:13.515", + "puller_resolved_ts":"2023-12-13 11:45:13.525", + "last_synced_ts":"2023-12-13 11:45:07.575", + "now_ts":"2023-12-13 11:50:24.875", + "info":"Please check whether PD is online and TiKV Regions are all available. If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. To check whether TiKV regions are all available, you can view 'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" +} +``` + +本接口支持在上游集群遇到灾害时对同步状态进行查询判断。在部分情况下,你可能无法直接判定 TiCDC 目前的数据同步任务是否完成。此时,你可以查询该接口,并结合返回结果中的 `info` 字段以及目前上游集群的状态进行判断。 + +在此示例中,`sink_checkpoint_ts` 在时间上落后于 `now_ts`,这可能是因为 TiCDC 还在追数据,也可能是由于 PD 或者 TiKV 出现了故障。如果这是 TiCDC 还在追数据导致的,说明同步任务尚未完成。如果这是由于 PD 或者 TiKV 出现了故障导致的,说明同步任务已经完成。因此,你需要参考 `info` 中的信息对集群状态进行辅助判断。 + +**示例 4:查询报错** + +```json +{ + "error_msg": "[CDC:ErrPDEtcdAPIError]etcd api call error: context deadline exceeded", + "error_code": "CDC:ErrPDEtcdAPIError" +} +``` + +在上游集群的 PD 长时间故障后,查询该 API 接口时会返回类似如上的错误,无法提供进一步的判断信息。因为 PD 故障会直接影响 TiCDC 的数据同步,当遇到这类错误时,你可以认为 TiCDC 已经尽可能完成数据同步,但下游集群仍然可能存在因 PD 故障导致的数据丢失。 + +## 暂停同步任务 + +该接口暂停一个同步任务,请求成功会返回 `200 OK`。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。 + +### 请求 URI + +`POST /api/v2/changefeeds/{changefeed_id}/pause` + +### 参数说明 + +#### 路径参数 + +| 参数名 | 说明 | +|:----------------|:----------------------------| +| `changefeed_id` | 需要暂停的同步任务 (changefeed) 的 ID | + +### 使用样例 + +以下请求会暂停 ID 为 `test1` 的同步任务。 + +```shell +curl -X POST http://127.0.0.1:8300/api/v2/changefeeds/test1/pause +``` + +如果请求成功,则返回 `200 OK`。如果请求失败,则返回错误信息和错误码。 + +## 恢复同步任务 + +该接口恢复一个同步任务,请求成功会返回 `200 OK`。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。 + +### 请求 URI + +`POST /api/v2/changefeeds/{changefeed_id}/resume` + +### 参数说明 + +#### 路径参数 + +| 参数名 | 说明 | +|:----------------|:----------------------------| +| `changefeed_id` | 需要恢复的同步任务 (changefeed) 的 ID | + +#### 请求体参数 + +```json +{ + "overwrite_checkpoint_ts": 0 +} +``` + +| 参数名 | 说明 | +|:--------------------------|:------------------------------------------------------| +| `overwrite_checkpoint_ts` | `UINT64` 类型,恢复同步任务 (changefeed) 时重新指定的 checkpoint TSO | + +### 使用样例 + +以下请求会恢复 ID 为 `test1` 的同步任务。 + +```shell +curl -X POST http://127.0.0.1:8300/api/v2/changefeeds/test1/resume -d '{}' +``` + +如果请求成功,则返回 `200 OK`。如果请求失败,则返回错误信息和错误码。 + +## 查询同步子任务列表 + +该接口是一个同步接口,请求成功会返回当前 TiCDC 集群中的所有同步子任务 (`processor`) 的基本信息。 + +### 请求 URI + +`GET /api/v2/processors` + +### 使用样例 + +```shell +curl -X GET http://127.0.0.1:8300/api/v2/processors +``` + +```json +{ + "total": 3, + "items": [ + { + "changefeed_id": "test2", + "capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e" + }, + { + "changefeed_id": "test1", + "capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e" + }, + { + "changefeed_id": "test", + "capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e" + } + ] +} +``` + +以上返回的信息的说明如下: + +- `changefeed_id`:同步任务的 ID。 +- `capture_id`:`Capture` 的 ID。 + +## 查询特定同步子任务 + +该接口是一个同步接口,请求成功会返回指定同步子任务 (`processor`) 的详细信息。 + +### 请求 URI + +`GET /api/v2/processors/{changefeed_id}/{capture_id}` + +### 参数说明 + +#### 路径参数 + +| 参数名 | 说明 | +|:----------------|:------------------------| +| `changefeed_id` | 需要查询的子任务的 Changefeed ID | +| `capture_id` | 需要查询的子任务的 Capture ID | + +### 使用样例 + +以下请求查询 `changefeed_id` 为 `test`、`capture_id` 为 `561c3784-77f0-4863-ad52-65a3436db6af` 的同步子任务。一个同步子任务通过 `changefeed_id` 和 `capture_id` 来标识。 + +```shell +curl -X GET http://127.0.0.1:8300/api/v2/processors/test/561c3784-77f0-4863-ad52-65a3436db6af +``` + +```json +{ + "table_ids": [ + 80 + ] +} +``` + +以上返回的信息的说明如下: + +- `table_ids`:在这个 capture 上同步的 table 的 ID。 + +## 查询 TiCDC 服务进程列表 + +该接口是一个同步接口,请求成功会返回当前 TiCDC 集群中的所有服务进程 (`capture`) 的基本信息。 + +### 请求 URI + +`GET /api/v2/captures` + +### 使用样例 + +```shell +curl -X GET http://127.0.0.1:8300/api/v2/captures +``` + +```json +{ + "total": 1, + "items": [ + { + "id": "d2912e63-3349-447c-90ba-72a4e04b5e9e", + "is_owner": true, + "address": "127.0.0.1:8300" + } + ] +} +``` + +以上返回的信息的说明如下: + +- `id`:`capture` 的 ID。 +- `is_owner`:该 `capture` 是否是 owner 。 +- `address`:该 `capture` 的地址。 + +## 驱逐 owner 节点 + +该接口是一个异步的请求,请求成功会返回 `200 OK`。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。 + +### 请求 URI + +`POST /api/v2/owner/resign` + +### 使用样例 + +以下请求会驱逐 TiCDC 当前的 owner 节点,并会触发新一轮的选举,产生新的 owner 节点。 + +```shell +curl -X POST http://127.0.0.1:8300/api/v2/owner/resign +``` + +如果请求成功,则返回 `200 OK`。如果请求失败,则返回错误信息和错误码。 + +## 动态调整 TiCDC Server 日志级别 + +该接口是一个同步接口,请求成功会返回 `200 OK`。 + +### 请求 URI + +`POST /api/v2/log` + +### 请求参数 + +#### 请求体参数 + +| 参数名 | 说明 | +|:------------|:----------| +| `log_level` | 想要设置的日志等级 | + +`log_level` 支持 [zap 提供的日志级别](https://godoc.org/go.uber.org/zap#UnmarshalText):"debug"、"info"、"warn"、"error"、"dpanic"、"panic"、"fatal"。 + +### 使用样例 + +```shell +curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v2/log -d '{"log_level":"debug"}' +``` + +如果请求成功,则返回 `200 OK`。如果请求失败,则返回错误信息和错误码。 \ No newline at end of file diff --git a/ticdc/ticdc-open-api.md b/ticdc/ticdc-open-api.md index 503ff6f67ae22..b3882fc242338 100644 --- a/ticdc/ticdc-open-api.md +++ b/ticdc/ticdc-open-api.md @@ -161,7 +161,11 @@ curl -X GET http://127.0.0.1:8300/api/v1/health `matcher`:匹配语法和过滤器规则语法相同。 +<<<<<<< HEAD `protocal`:对于 MQ 类的 Sink,可以指定消息的协议格式。目前支持 default、canal-json(实验特性)、avro 和 maxwell 四种协议,默认为 TiCDC Open Protocol。 +======= +`protocol`:对于 MQ 类的 Sink,可以指定消息的协议格式。目前支持 `canal-json`、`open-protocol` 和 `avro` 协议。 +>>>>>>> 58f8d6b51b (cdc: remove maxwell protocol, add simple protocol (#18030) (#18171)) ### 使用样例 diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md new file mode 100644 index 0000000000000..d7f5f4a355278 --- /dev/null +++ b/ticdc/ticdc-sink-to-kafka.md @@ -0,0 +1,526 @@ +--- +title: 同步数据到 Kafka +summary: 了解如何使用 TiCDC 将数据同步到 Kafka。 +--- + +# 同步数据到 Kafka + +本文介绍如何使用 TiCDC 创建一个将增量数据复制到 Kafka 的 Changefeed。 + +## 创建同步任务,复制增量数据 Kafka + +使用以下命令来创建同步任务: + +```shell +cdc cli changefeed create \ + --server=http://10.0.10.25:8300 \ + --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" \ + --changefeed-id="simple-replication-task" +``` + +```shell +Create changefeed successfully! +ID: simple-replication-task +Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2023-11-28T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null} +``` + +- `--server`:TiCDC 集群中任意一个 TiCDC 服务器的地址。 +- `--changefeed-id`:同步任务的 ID,格式需要符合正则表达式 `^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`。如果不指定该 ID,TiCDC 会自动生成一个 UUID(version 4 格式)作为 ID。 +- `--sink-uri`:同步任务下游的地址,详见:[Sink URI 配置 Kafka](/ticdc/ticdc-sink-to-kafka.md#sink-uri-配置-kafka)。 +- `--start-ts`:指定 changefeed 的开始 TSO。TiCDC 集群将从这个 TSO 开始拉取数据。默认为当前时间。 +- `--target-ts`:指定 changefeed 的目标 TSO。TiCDC 集群拉取数据直到这个 TSO 停止。默认为空,即 TiCDC 不会自动停止。 +- `--config`:指定 changefeed 配置文件,详见:[TiCDC Changefeed 配置参数](/ticdc/ticdc-changefeed-config.md)。 + +## 支持的 Kafka 版本 + +TiCDC 与支持的 Kafka 最低版本对应关系如下: + +| TiCDC 版本 | 支持的 Kafka 最低版本 | +| :-----------------------| :------------------ | +| TiCDC >= v8.1.0 | 2.1.0 | +| v7.6.0 <= TiCDC < v8.1.0 | 2.4.0 | +| v7.5.2 <= TiCDC < v8.0.0 | 2.1.0 | +| v7.5.0 <= TiCDC < v7.5.2 | 2.4.0 | +| v6.5.0 <= TiCDC < v7.5.0 | 2.1.0 | +| v6.1.0 <= TiCDC < v6.5.0 | 2.0.0 | + +## Sink URI 配置 `kafka` + +Sink URI 用于指定 TiCDC 目标系统的连接信息,遵循以下格式: + +```shell +[scheme]://[userinfo@][host]:[port][/path]?[query_parameters] +``` + +一个通用的配置样例如下所示: + +```shell +--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" +``` + +URI 中可配置的的参数如下: + +| 参数 | 描述 | +| :------------------ | :------------------------------------------------------------ | +| `127.0.0.1` | 下游 Kafka 对外提供服务的 IP。 | +| `9092` | 下游 Kafka 的连接端口。 | +| `topic-name` | 变量,使用的 Kafka topic 名字。 | +| `kafka-version` | 下游 Kafka 版本号。该值需要与下游 Kafka 的实际版本保持一致。 | +| `kafka-client-id` | 指定同步任务的 Kafka 客户端的 ID(可选,默认值为 `TiCDC_sarama_producer_同步任务的 ID`)。 | +| `partition-num` | 下游 Kafka partition 数量(可选,不能大于实际 partition 数量,否则创建同步任务会失败,默认值 `3`)。| +| `max-message-bytes` | 每次向 Kafka broker 发送消息的最大数据量(可选,默认值 `10MB`)。从 v5.0.6 和 v4.0.6 开始,默认值分别从 64MB 和 256MB 调整至 10 MB。| +| `replication-factor` | Kafka 消息保存副本数(可选,默认值 `1`),需要大于等于 Kafka 中 [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) 的值。 | +| `required-acks` | 在 `Produce` 请求中使用的配置项,用于告知 broker 需要收到多少副本确认后才进行响应。可选值有:`0`(`NoResponse`:不发送任何响应,只有 TCP ACK),`1`(`WaitForLocal`:仅等待本地提交成功后再响应)和 `-1`(`WaitForAll`:等待所有同步副本提交后再响应。最小同步副本数量可通过 broker 的 [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) 配置项进行配置)。(可选,默认值为 `-1`)。 | +| `compression` | 设置发送消息时使用的压缩算法(可选值为 `none`、`lz4`、`gzip`、`snappy` 和 `zstd`,默认值为 `none`)。注意 Snappy 压缩文件必须遵循[官方 Snappy 格式](https://github.com/google/snappy)。不支持其他非官方压缩格式。| +| `protocol` | 输出到 Kafka 的消息协议,可选值有 `canal-json`、`open-protocol` 和 `avro`。 | +| `auto-create-topic` | 当传入的 `topic-name` 在 Kafka 集群不存在时,TiCDC 是否要自动创建该 topic(可选,默认值 `true`)。 | +| `enable-tidb-extension` | 可选,默认值是 `false`。当输出协议为 `canal-json` 时,如果该值为 `true`,TiCDC 会发送 [WATERMARK 事件](/ticdc/ticdc-canal-json.md#watermark-event),并在 Kafka 消息中添加 TiDB 扩展字段。从 6.1.0 开始,该参数也可以和输出协议 `avro` 一起使用。如果该值为 `true`,TiCDC 会在 Kafka 消息中添加[三个 TiDB 扩展字段](/ticdc/ticdc-avro-protocol.md#tidb-扩展字段)。| +| `max-batch-size` | 从 v4.0.9 开始引入。当消息协议支持把多条变更记录输出至一条 Kafka 消息时,该参数用于指定这一条 Kafka 消息中变更记录的最多数量。目前,仅当 Kafka 消息的 `protocol` 为 `open-protocol` 时有效(可选,默认值 `16`)。| +| `enable-tls` | 连接下游 Kafka 实例是否使用 TLS(可选,默认值 `false`)。 | +| `ca` | 连接下游 Kafka 实例所需的 CA 证书文件路径(可选)。 | +| `cert` | 连接下游 Kafka 实例所需的证书文件路径(可选)。 | +| `key` | 连接下游 Kafka 实例所需的证书密钥文件路径(可选)。 | +| `insecure-skip-verify` | 连接下游 Kafka 实例时是否跳过证书验证(可选,默认值 `false`)。 | +| `sasl-user` | 连接下游 Kafka 实例所需的 SASL/PLAIN 或 SASL/SCRAM 认证的用户名(authcid)(可选)。 | +| `sasl-password` | 连接下游 Kafka 实例所需的 SASL/PLAIN 或 SASL/SCRAM 认证的密码(可选)。如有特殊字符,需要用 URL encode 转义。 | +| `sasl-mechanism` | 连接下游 Kafka 实例所需的 SASL 认证方式的名称,可选值有 `plain`、`scram-sha-256`、`scram-sha-512` 和 `gssapi`。 | +| `sasl-gssapi-auth-type` | gssapi 认证类型,可选值有 `user` 和 `keytab`(可选)。 | +| `sasl-gssapi-keytab-path` | gssapi keytab 路径(可选)。| +| `sasl-gssapi-kerberos-config-path` | gssapi kerberos 配置路径(可选)。 | +| `sasl-gssapi-service-name` | gssapi 服务名称(可选)。 | +| `sasl-gssapi-user` | gssapi 认证使用的用户名(可选)。 | +| `sasl-gssapi-password` | gssapi 认证使用的密码(可选)。如有特殊字符,需要用 URL encode 转义。 | +| `sasl-gssapi-realm` | gssapi realm 名称(可选)。 | +| `sasl-gssapi-disable-pafxfast` | gssapi 是否禁用 PA-FX-FAST(可选)。 | +| `dial-timeout` | 和下游 Kafka 建立连接的超时时长,默认值为 `10s`。 | +| `read-timeout` | 读取下游 Kafka 返回的 response 的超时时长,默认值为 `10s`。 | +| `write-timeout` | 向下游 Kafka 发送 request 的超时时长,默认值为 `10s`。 | +| `avro-decimal-handling-mode` | 仅在输出协议是 `avro` 时有效。该参数决定了如何处理 DECIMAL 类型的字段,值可以是 `string` 或 `precise`,表明映射成字符串还是浮点数。 | +| `avro-bigint-unsigned-handling-mode` | 仅在输出协议是 `avro` 时有效。该参数决定了如何处理 BIGINT UNSIGNED 类型的字段,值可以是 `string` 或 `long`,表明映射成字符串还是 64 位整型。| + +### 最佳实践 + +* TiCDC 推荐用户自行创建 Kafka Topic,你至少需要设置该 Topic 每次向 Kafka broker 发送消息的最大数据量和下游 Kafka partition 的数量。在创建 changefeed 的时候,这两项设置分别对应 `max-message-bytes` 和 `partition-num` 参数。 +* 如果你在创建 changefeed 时,使用了尚未存在的 Topic,那么 TiCDC 会尝试使用 `partition-num` 和 `replication-factor` 参数自行创建 Topic。建议明确指定这两个参数。 +* 在大多数情况下,建议使用 `canal-json` 协议。 + +> **注意:** +> +> 当 `protocol` 为 `open-protocol` 时,TiCDC 会尽量避免产生长度超过 `max-message-bytes` 的消息。但如果单条数据变更记录需要超过 `max-message-bytes` 个字节来表示,为了避免静默失败,TiCDC 会试图输出这条消息并在日志中输出 Warning。 + +### TiCDC 使用 Kafka 的认证与授权 + +使用 Kafka 的 SASL 认证时配置样例如下所示: + +- SASL/PLAIN + + ```shell + --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain" + ``` + +- SASL/SCRAM + + SCRAM-SHA-256、SCRAM-SHA-512 与 PLAIN 方式类似,只需要将 `sasl-mechanism` 指定为对应的认证方式即可。 + +- SASL/GSSAPI + + SASL/GSSAPI `user` 类型认证: + + ```shell + --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com" + ``` + + `sasl-gssapi-user` 和 `sasl-gssapi-realm` 的值与 kerberos 中指定的 [principle](https://web.mit.edu/kerberos/krb5-1.5/krb5-1.5.4/doc/krb5-user/What-is-a-Kerberos-Principal_003f.html) 有关。例如,principle 为 `alice/for-kafka@example.com`,则 `sasl-gssapi-user` 和 `sasl-gssapi-realm` 的值应该分别指定为 `alice/for-kafka` 和 `example.com`。 + + SASL/GSSAPI `keytab` 类型认证: + + ```shell + --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com" + ``` + + SASL/GSSAPI 认证方式详见 [Configuring GSSAPI](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_gssapi.html)。 + +- TLS/SSL 加密 + + 如果 Kafka broker 启用了 TLS/SSL 加密,则需要在 `--sink-uri` 中增加 `enable-tls=true` 参数值。如果需要使用自签名证书,则还需要在 `--sink-uri` 中指定 `ca`、`cert` 跟 `key` 几个参数。 + +- ACL 授权 + + TiCDC 能够正常工作所需的最小权限集合如下: + + - 对 Topic [资源类型](https://docs.confluent.io/platform/current/kafka/authorization.html#resources)的 `Create` 、`Write` 和 `Describe` 权限。 + - 对 Cluster 资源类型的 `DescribeConfigs` 权限。 + +### TiCDC 集成 Kafka Connect (Confluent Platform) + +如要使用 Confluent 提供的 [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) 向关系型或非关系型数据库传输数据,请选择 `avro` 协议,并在 `schema-registry` 中提供 [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) 的 URL。 + +配置样例如下所示: + +```shell +--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml +``` + +```shell +[sink] +dispatchers = [ + {matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, +] +``` + +集成具体步骤详见[与 Confluent Cloud 进行数据集成](/ticdc/integrate-confluent-using-ticdc.md)。 + +### TiCDC 集成 AWS Glue Schema Registry + +从 v7.4.0 开始,TiCDC 支持在用户选择 Avro 协议同步数据时使用 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) 作为 Schema Registry。配置样例如下所示: + +```shell +./cdc cli changefeed create --server=127.0.0.1:8300 --changefeed-id="kafka-glue-test" --sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --config changefeed_glue.toml +``` + +```toml +[sink] +[sink.kafka-config.glue-schema-registry-config] +region="us-west-1" +registry-name="ticdc-test" +access-key="xxxx" +secret-access-key="xxxx" +token="xxxx" +``` + +在以上配置中,`region` 和 `registry-name` 是必填项,`access-key`、`secret-access-key` 和 `token` 是可选项。最佳实践是将 AWS 连接凭证设置为环境变量或存储在 `~/.aws/credentials` 文件中,而不是将它们设置在 changefeed 的配置文件中。 + +更多信息,请参阅 [AWS官方文档](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials)。 + +## 自定义 Kafka Sink 的 Topic 和 Partition 的分发规则 + +### Matcher 匹配规则 + +以如下示例配置文件中的 `dispatchers` 配置项为例: + +```toml +[sink] +dispatchers = [ + {matcher = ['test1.*', 'test2.*'], topic = "Topic 表达式 1", partition = "ts" }, + {matcher = ['test3.*', 'test4.*'], topic = "Topic 表达式 2", partition = "index-value" }, + {matcher = ['test1.*', 'test5.*'], topic = "Topic 表达式 3", partition = "table"}, + {matcher = ['test6.*'], partition = "ts"} +] +``` + +- 对于匹配了 matcher 规则的表,按照对应的 topic 表达式指定的策略进行分发。例如表 test3.aa,按照 topic 表达式 2 分发;表 test5.aa,按照 topic 表达式 3 分发。 +- 对于匹配了多个 matcher 规则的表,以靠前的 matcher 对应的 topic 表达式为准。例如表 test1.aa,按照 topic 表达式 1 分发。 +- 对于没有匹配任何 matcher 的表,将对应的数据变更事件发送到 --sink-uri 中指定的默认 topic 中。例如表 test10.aa 发送到默认 topic。 +- 对于匹配了 matcher 规则但是没有指定 topic 分发器的表,将对应的数据变更发送到 --sink-uri 中指定的默认 topic 中。例如表 test6.aa 发送到默认 topic。 + +### Topic 分发器 + +Topic 分发器用 topic = "xxx" 来指定,并使用 topic 表达式来实现灵活的 topic 分发策略。topic 的总数建议小于 1000。 + +Topic 表达式的基本规则为 `[prefix][{schema}][middle][{table}][suffix]`,详细解释如下: + +- `prefix`:可选项,代表 Topic Name 的前缀。 +- `{schema}`:可选项,用于匹配库名。 +- `middle`:可选项,代表库表名之间的分隔符。 +- `{table}`:可选项,用于匹配表名。 +- `suffix`:可选项,代表 Topic Name 的后缀。 + +其中 `prefix`、`middle` 以及 `suffix` 仅允许出现大小写字母(`a-z`、`A-Z`)、数字(`0-9`)、点号(`.`)、下划线(`_`)和中划线(`-`);`{schema}`、`{table}` 均为小写,诸如 `{Schema}` 以及 `{TABLE}` 这样的占位符是无效的。 + +一些示例如下: + +- `matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"` + - 对于表 `test1.table1` 对应的数据变更事件,发送到名为 `hello_test1_table1` 的 topic 中。 + - 对于表 `test2.table2` 对应的数据变更事件,发送到名为 `hello_test2_table2` 的 topic 中。 +- `matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"` + - 对于 `test3` 下的所有表对应的数据变更事件,发送到名为 `hello_test3_world` 的 topic 中。 + - 对于 `test4` 下的所有表对应的数据变更事件,发送到名为 `hello_test4_world` 的 topic 中。 +- `matcher = ['test5.*, 'test6.*'], topic = "hard_code_topic_name"` + - 对于 `test5` 和 `test6` 下的所有表对应的数据变更事件,发送到名为 `hard_code_topic_name` 的 topic 中。你可以直接指定 topic 名称。 +- `matcher = ['*.*'], topic = "{schema}_{table}"` + - 对于 TiCDC 监听的所有表,按照“库名_表名”的规则分别分发到独立的 topic 中;例如对于 `test.account` 表,TiCDC 会将其数据变更日志分发到名为 `test_account` 的 Topic 中。 + +### DDL 事件的分发 + +#### 库级别 DDL + +诸如 `create database`、`drop database` 这类和某一张具体的表无关的 DDL,称之为库级别 DDL。对于库级别 DDL 对应的事件,被发送到 `--sink-uri` 中指定的默认 topic 中。 + +#### 表级别 DDL + +诸如 `alter table`、`create table` 这类和某一张具体的表相关的 DDL,称之为表级别 DDL。对于表级别 DDL 对应的事件,按照 dispatchers 的配置,被发送到相应的 topic 中。 + +例如,对于 `matcher = ['test.*'], topic = {schema}_{table}` 这样的 dispatchers 配置,DDL 事件分发情况如下: + +- 若 DDL 事件中涉及单张表,则将 DDL 事件原样发送到相应的 topic 中。 + - 对于 DDL 事件 `drop table test.table1`,该事件会被发送到名为 `test_table1` 的 topic 中。 +- 若 DDL 事件中涉及多张表(`rename table` / `drop table` / `drop view` 都可能涉及多张表),则将单个 DDL 事件拆分为多个发送到相应的 topic 中。 + - 对于 DDL 事件 `rename table test.table1 to test.table10, test.table2 to test.table20`,则将 `rename table test.table1 to test.table10` 的 DDL 事件发送到名为 `test_table1` 的 topic 中,将 `rename table test.table2 to test.table20` 的 DDL 事件发送到名为 `test.table2` 的 topic 中。 + +### Partition 分发器 + +partition 分发器用 `partition = "xxx"` 来指定,支持 `default`、`index-value`、`columns`、`table` 和 `ts` 共五种 partition 分发器,分发规则如下: + +- `default`:默认使用 table 分发规则。使用所属库名和表名计算 partition 编号,一张表的数据被发送到相同的 partition。单表数据只存在于一个 partition 中并保证有序,但是发送吞吐量有限,无法通过添加消费者的方式提升消费速度。 +- `index-value`:使用事件所属表的主键、唯一索引或由 `index` 配置指定的索引的值计算 partition 编号,一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。 +- `columns`:使用由 `columns` 指定的列的值计算 partition 编号。一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。 +- `table`:使用事件所属的表的库名和表名计算 partition 编号。 +- `ts`:使用事件的 commitTs 计算 partition 编号。一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。一条数据的多次修改可能被发送到不同的 partition 中。消费者消费进度不同可能导致消费端数据不一致。因此,消费端需要对来自多个 partition 的数据按 commitTs 排序后再进行消费。 + +以如下示例配置文件中的 `dispatchers` 配置项为例: + +```toml +[sink] +dispatchers = [ + {matcher = ['test.*'], partition = "index-value"}, + {matcher = ['test1.*'], partition = "index-value", index = "index1"}, + {matcher = ['test2.*'], partition = "columns", columns = ["id", "a"]}, + {matcher = ['test3.*'], partition = "table"}, +] +``` + +- 任何属于库 `test` 的表均使用 `index-value` 分发规则,即使用主键或者唯一索引的值计算 partition 编号。如果有主键则使用主键,否则使用最短的唯一索引。 +- 任何属于库 `test1` 的表均使用 `index-value` 分发规则,并且使用名为 `index1` 的索引的所有列的值计算 partition 编号。如果指定的索引不存在,则报错。注意,`index` 指定的索引必须是唯一索引。 +- 任何属于库 `test2` 的表均使用 `columns` 分发规则,并且使用列 `id` 和 `a` 的值计算 partition 编号。如果任一列不存在,则报错。 +- 任何属于库 `test3` 的表均使用 `table` 分发规则。 +- 对于属于库 `test4` 的表,因为不匹配上述任何一个规则,所以使用默认的 `default`,即 `table` 分发规则。 + +如果一张表,匹配了多个分发规则,以第一个匹配的规则为准。 + +> **注意:** +> +> 从 v6.1 开始,为了明确配置项的含义,用来指定 partition 分发器的配置项由原来的 `dispatcher` 改为 `partition`,`partition` 为 `dispatcher` 的别名。例如,以下两条规则完全等价: +> +> ``` +> [sink] +> dispatchers = [ +> {matcher = ['*.*'], dispatcher = "index-value"}, +> {matcher = ['*.*'], partition = "index-value"}, +> ] +> ``` +> +> 但是 `dispatcher` 与 `partition` 不能出现在同一条规则中。例如,以下规则非法: +> +> ``` +> {matcher = ['*.*'], dispatcher = "index-value", partition = "table"}, +> ``` + +## 列选择功能 + +列选择功能支持对事件中的列进行选择,只将指定的列的数据变更事件发送到下游。 + +以如下示例配置文件中的 `column-selectors` 配置项为例: + +```toml +[sink] +column-selectors = [ + {matcher = ['test.t1'], columns = ['a', 'b']}, + {matcher = ['test.*'], columns = ["*", "!b"]}, + {matcher = ['test1.t1'], columns = ['column*', '!column1']}, + {matcher = ['test3.t'], columns = ["column?", "!column1"]}, +] +``` + +- 对于表 `test.t1`,只发送 `a` 和 `b` 两列的数据。 +- 对于属于库 `test` 的表(除 `t1` 外),发送除 `b` 列之外的所有列的数据。 +- 对于表 `test1.t1`,发送所有以 `column` 开头的列,但是不发送 `column1` 列的数据。 +- 对于表 `test3.t`,发送所有以 `column` 开头且列名长度为 7 的列,但是不发送 `column1` 列的数据。 +- 不匹配任何规则的表将不进行列过滤,发送所有列的数据。 + +> **注意:** +> +> 经过 `column-selectors` 规则过滤后,表中的数据必须要有主键或者唯一键被同步,否则在 changefeed 创建或运行时会报错。 + +## 横向扩展大单表的负载到多个 TiCDC 节点 + +该功能可以按照大单表的数据量和每分钟的修改行数将表的同步范围切分为多个,并使各个范围之间所同步的数据量和修改行数基本相同。该功能将这些范围分布到多个 TiCDC 节点进行同步,使得多个 TiCDC 节点可以同时同步大单表。该功能可以解决以下两个问题: + +- 单个 TiCDC 节点不能及时同步大单表。 +- TiCDC 节点之间资源(CPU、内存等)消耗不均匀。 + +> **警告:** +> +> TiCDC v7.0.0 仅支持在 Kafka 同步任务上开启大单表的横向扩展功能。 + +配置样例如下所示: + +```toml +[scheduler] +# 默认值为 "false",设置为 "true" 以打开该功能。 +enable-table-across-nodes = true +# 打开该功能后,该功能只对 Region 个数大于 `region-threshold` 值的表生效。 +region-threshold = 100000 +# 打开该功能后,该功能会对每分钟修改行数大于 `write-key-threshold` 值的表生效。 +# 注意: +# * 该参数默认值为 0,代表该功能默认不会按表的修改行数来切分表的同步范围。 +# * 你可以根据集群负载来配置该参数,如 30000,代表当表每分钟的更新行数超过 30000 时,该功能将会切分表的同步范围。 +# * 当 `region-threshold` 和 `write-key-threshold` 同时配置时, +# TiCDC 将优先检查修改行数是否大于 `write-key-threshold`, +# 如果不超过,则再检查 Region 个数是否大于 `region-threshold`。 +write-key-threshold = 30000 +``` + +一个表包含的 Region 个数可用如下 SQL 查询: + +```sql +SELECT COUNT(*) FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS WHERE DB_NAME="database1" AND TABLE_NAME="table1" AND IS_INDEX=0; +``` + +## 处理超过 Kafka Topic 限制的消息 + +Kafka Topic 对可以接收的消息大小有限制,该限制由 [`max.message.bytes`](https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes) 参数控制。当 TiCDC Kafka sink 在发送数据时,如果发现数据大小超过了该限制,会导致 changefeed 报错,无法继续同步数据。为了解决这个问题,TiCDC 新增一个参数 `large-message-handle-option` 并提供如下解决方案。 + +目前,如下功能支持 Canal-JSON 和 Open Protocol 两种编码协议。使用 Canal-JSON 协议时,你需要在 `sink-uri` 中设置 `enable-tidb-extension=true`。 + +### TiCDC 层数据压缩功能 + +从 v7.4.0 开始,TiCDC Kafka sink 支持在编码消息后立即对数据进行压缩,并与消息大小限制参数比较。该功能能够有效减少超过消息大小限制的情况发生。 + +配置样例如下所示: + +```toml +[sink.kafka-config.large-message-handle] +# 该参数从 v7.4.0 开始引入 +# 默认为 "none",即不开启编码时的压缩功能 +# 可选值有 "none"、"lz4"、"snappy",默认为 "none" +large-message-handle-compression = "none" +``` + +该功能和 Kafka producer 的压缩功能不同: + +* `large-message-handle-compression` 中指定的压缩算法,它启用的是对单条 Kafka 消息进行压缩,并且压缩是在与消息大小限制参数比较之前进行。 +* 用户可以在 `sink-uri` 中配置压缩算法,它所启用的压缩功能应用在整个发送数据请求,其中包含多条 Kafka 消息,并且压缩是在和消息大小限制参数比较之后进行的。 + +开启了 `large-message-handle-compression` 之后,消费者收到的消息经过特定压缩协议编码,消费者应用程序需要使用指定的压缩协议进行数据解码。 + +### 只发送 Handle Key + +从 v7.3.0 开始,TiCDC Kafka sink 支持在消息大小超过限制时只发送 Handle Key 的数据。这样可以显著减少消息的大小,避免因为消息大小超过 Kafka Topic 限制而导致 changefeed 发生错误和同步任务失败的情况。 + +Handle Key 指的是: + +* 如果被同步的表有定义主键,主键即为 Handle Key 。 +* 如果没有主键,但是有定义 Not NULL Unique Key,Unique Key 即为 Handle Key。 + +配置样例如下所示: + +```toml +[sink.kafka-config.large-message-handle] +# 该参数从 v7.3.0 开始引入 +# 默认为空,即消息超过大小限制后,同步任务失败 +# 设置为 "handle-key-only" 时,如果消息超过大小,data 字段内容只发送 handle key;如果依旧超过大小,同步任务失败 +large-message-handle-option = "handle-key-only" +``` + +### 消费只有 Handle Key 的消息 + +只有 Handle Key 数据的消息格式如下: + +```json +{ + "id": 0, + "database": "test", + "table": "tp_int", + "pkNames": [ + "id" + ], + "isDdl": false, + "type": "INSERT", + "es": 1639633141221, + "ts": 1639633142960, + "sql": "", + "sqlType": { + "id": 4 + }, + "mysqlType": { + "id": "int" + }, + "data": [ + { + "id": "2" + } + ], + "old": null, + "_tidb": { // TiDB 的扩展字段 + "commitTs": 163963314122145239, + "onlyHandleKey": true + } +} +``` + +Kafka 消费者收到消息之后,首先检查 `onlyHandleKey` 字段。如果该字段存在且为 `true`,表示该消息只包含 Handle Key 的数据。此时,你需要查询上游 TiDB,通过 [`tidb_snapshot` 读取历史数据](/read-historical-data.md)来获取完整的数据。 + +> **警告:** +> +> 在 Kafka 消费者处理数据并查询 TiDB 时,可能发生数据已经被 GC 的情况。你需要[调整 TiDB 集群的 GC Lifetime 设置](/system-variables.md#tidb_gc_life_time-从-v50-版本开始引入) 为一个较大的值,以避免该情况。 + +### 发送大消息到外部存储 + +从 v7.4.0 开始,TiCDC Kafka sink 支持在消息大小超过限制时将该条消息发送到外部存储服务,同时向 Kafka 发送一条含有该大消息在外部存储服务中地址的消息。这样可以避免因为消息大小超过 Kafka Topic 限制而导致 changefeed 失败的情况。 + +配置样例如下所示: + +```toml +[sink.kafka-config.large-message-handle] +# large-message-handle-option 从 v7.3.0 开始引入 +# 默认为 "none",即消息超过大小限制后,同步任务失败 +# 设置为 "handle-key-only" 时,如果消息超过大小,data 字段内容只发送 Handle Key。如果依旧超过大小,同步任务失败 +# 设置为 "claim-check" 时,如果消息超过大小,将该条消息发送到外部存储服务 +large-message-handle-option = "claim-check" +claim-check-storage-uri = "s3://claim-check-bucket" +``` + +当指定 `large-message-handle-option` 为 `claim-check` 时,`claim-check-storage-uri` 必须设置为一个有效的外部存储服务地址,否则创建 changefeed 将会报错。 + +> **建议:** +> +> 关于 Amazon S3、GCS 以及 Azure Blob Storage 的 URI 参数的详细参数说明,请参考[外部存储服务的 URI 格式](/external-storage-uri.md)。 + +TiCDC 不会清理外部存储服务上的消息,数据消费者需要自行管理外部存储服务。 + +### 消费外部存储中的大消息 + +Kafka 消费者会收到一条含有大消息在外部存储服务中的地址的消息,格式如下: + +```json +{ + "id": 0, + "database": "test", + "table": "tp_int", + "pkNames": [ + "id" + ], + "isDdl": false, + "type": "INSERT", + "es": 1639633141221, + "ts": 1639633142960, + "sql": "", + "sqlType": { + "id": 4 + }, + "mysqlType": { + "id": "int" + }, + "data": [ + { + "id": "2" + } + ], + "old": null, + "_tidb": { // TiDB 的扩展字段 + "commitTs": 163963314122145239, + "claimCheckLocation": "s3:/claim-check-bucket/${uuid}.json" + } +} +``` + +如果收到的消息有 `claimCheckLocation` 字段,Kafka 消费者根据该字段提供的地址读取以 JSON 格式存储的大消息数据。消息格式如下: + +```json +{ + key: "xxx", + value: "xxx", +} +``` + +`key` 和 `value` 分别存有编码后的大消息,该消息原本应该发送到 Kafka 消息中的对应字段。消费者可以通过解析这两部分的数据,还原大消息的内容。