diff --git a/TOC.md b/TOC.md index c73ba0ad8bc5d..09630a7543a18 100644 --- a/TOC.md +++ b/TOC.md @@ -117,12 +117,17 @@ - [Migrate from CSV Files](/migrate-from-csv-files-to-tidb.md) - [Migrate from SQL Files](/migrate-from-sql-files-to-tidb.md) - [Migrate from One TiDB Cluster to Another TiDB Cluster](/migrate-from-tidb-to-tidb.md) - - [Replicate Data from TiDB to Kafka](/replicate-data-to-kafka.md) + - [Migrate from TiDB to MySQL-compatible Databases](/migrate-from-tidb-to-mysql.md) - Advanced Migration - [Continuous Replication with gh-ost or pt-osc](/migrate-with-pt-ghost.md) - [Migrate to a Downstream Table with More Columns](/migrate-with-more-columns-downstream.md) - [Filter Binlog Events](/filter-binlog-event.md) - [Filter DML Events Using SQL Expressions](/filter-dml-event.md) +- Integrate + - [Overview](/integration-overview.md) + - Integration Scenarios + - [Integrate with Confluent Cloud and Snowflake](/ticdc/integrate-confluent-using-ticdc.md) + - [Integrate with Apache Kafka and Apache Flink](/replicate-data-to-kafka.md) - Maintain - Upgrade - [Use TiUP (Recommended)](/upgrade-tidb-using-tiup.md) @@ -499,7 +504,6 @@ - [TiCDC Open Protocol](/ticdc/ticdc-open-protocol.md) - [TiCDC Avro Protocol](/ticdc/ticdc-avro-protocol.md) - [TiCDC Canal-JSON Protocol](/ticdc/ticdc-canal-json.md) - - [Integrate TiDB with Confluent and Snowflake](/ticdc/integrate-confluent-using-ticdc.md) - [FAQs](/ticdc/ticdc-faq.md) - [Glossary](/ticdc/ticdc-glossary.md) - [Dumpling](/dumpling-overview.md) diff --git a/integration-overview.md b/integration-overview.md new file mode 100644 index 0000000000000..5b228a97dfd9b --- /dev/null +++ b/integration-overview.md @@ -0,0 +1,16 @@ +--- +title: Data Integration Overview +summary: Learn the overview of data integration scenarios. +--- + +# Data Integration Overview + +Data integration means the flow, transfer, and consolidation of data among various data sources. As data grows exponentially in volume and data value is more profoundly explored, data integration has become increasingly popular and urgent. To avoid the situation that TiDB becomes data silos and to integrate data with different platforms, TiCDC offers the capability to replicate TiDB incremental data change logs to other data platforms. This document describes the data integration applications using TiCDC. You can choose an integration solution that suits your business scenarios. + +## Integrate with Confluent Cloud + +You can use TiCDC to replicate incremental data from TiDB to Confluent Cloud, and replicate the data to ksqlDB, Snowflake, and SQL Server via Confluent Cloud. For details, see [Integrate with Confluent Cloud](/ticdc/integrate-confluent-using-ticdc.md). + +## Integrate with Apache Kafka and Apache Flink + +You can use TiCDC to replicate incremental data from TiDB to Apache Kafka, and consume the data using Apache Flink. For details, see [Integrate with Apache Kafka and Apache Flink](/replicate-data-to-kafka.md). \ No newline at end of file diff --git a/media/integrate/sql-query-result.png b/media/integrate/sql-query-result.png new file mode 100644 index 0000000000000..98e88155cc30c Binary files /dev/null and b/media/integrate/sql-query-result.png differ diff --git a/migrate-aurora-to-tidb.md b/migrate-aurora-to-tidb.md index efd2ab67e902c..4e989b445e222 100644 --- a/migrate-aurora-to-tidb.md +++ b/migrate-aurora-to-tidb.md @@ -149,7 +149,7 @@ If you need to enable TLS in the TiDB cluster, refer to [TiDB Lightning Configur - Check progress in [the monitoring dashboard](/tidb-lightning/monitor-tidb-lightning.md). - Check progress in [the TiDB Lightning web interface](/tidb-lightning/tidb-lightning-web-interface.md). -4. After TiDB Lightning completes the import, it exits automatically. If you find the last 5 lines of its log print `the whole procedure completed`, the import is successful. +4. After TiDB Lightning completes the import, it exits automatically. Check whether `tidb-lightning.log` contains `the whole procedure completed` in the last lines. If yes, the import is successful. If no, the import encounters an error. Address the error as instructed in the error message. > **Note:** > diff --git a/migrate-from-csv-files-to-tidb.md b/migrate-from-csv-files-to-tidb.md index 29238b79a7e40..96424f759c5fb 100644 --- a/migrate-from-csv-files-to-tidb.md +++ b/migrate-from-csv-files-to-tidb.md @@ -127,7 +127,7 @@ After the import starts, you can check the progress of the import by either of t - Check progress in [the monitoring dashboard](/tidb-lightning/monitor-tidb-lightning.md). - Check progress in [the TiDB Lightning web interface](/tidb-lightning/tidb-lightning-web-interface.md). -After TiDB Lightning completes the import, it exits automatically. If you find the last 5 lines of its log print `the whole procedure completed`, the import is successful. +After TiDB Lightning completes the import, it exits automatically. Check whether `tidb-lightning.log` contains `the whole procedure completed` in the last lines. If yes, the import is successful. If no, the import encounters an error. Address the error as instructed in the error message. > **Note:** > diff --git a/migrate-from-sql-files-to-tidb.md b/migrate-from-sql-files-to-tidb.md index 8325589fc13eb..893ebdfc1fe82 100644 --- a/migrate-from-sql-files-to-tidb.md +++ b/migrate-from-sql-files-to-tidb.md @@ -89,7 +89,7 @@ After the import is started, you can check the progress in one of the following - Use the Grafana dashboard. For details, see [TiDB Lightning Monitoring](/tidb-lightning/monitor-tidb-lightning.md). - Use web interface. For details, see [TiDB Lightning Web Interface](/tidb-lightning/tidb-lightning-web-interface.md). -After the import is completed, TiDB Lightning automatically exits. If `the whole procedure completed` is in the last 5 lines of the log, it means that the import is successfully completed. +After the import is completed, TiDB Lightning automatically exits. Check whether `tidb-lightning.log` contains `the whole procedure completed` in the last lines. If yes, the import is successful. If no, the import encounters an error. Address the error as instructed in the error message. > **Note:** > diff --git a/migrate-from-tidb-to-mysql.md b/migrate-from-tidb-to-mysql.md new file mode 100644 index 0000000000000..330193570d03f --- /dev/null +++ b/migrate-from-tidb-to-mysql.md @@ -0,0 +1,229 @@ +--- +title: Migrate Data from TiDB to MySQL-compatible Databases +summary: Learn how to migrate data from TiDB to MySQL-compatible databases. +--- + +# Migrate Data from TiDB to MySQL-compatible Databases + +This document describes how to migrate data from TiDB clusters to MySQL-compatible databases, such as Aurora, MySQL, and MariaDB. The whole process contains four steps: + +1. Set up the environment. +2. Migrate full data. +3. Migrate incremental data. +4. Switch services to the new TiDB cluster. + +## Step 1. Set up the environment + +1. Deploy a TiDB cluster upstream. + + Deploy a TiDB cluster by using TiUP Playground. For more information, refer to [Deploy and Maintain an Online TiDB Cluster Using TiUP](/tiup/tiup-cluster.md). + + ```shell + # Create a TiDB cluster + tiup playground --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 + # View cluster status + tiup status + ``` + +2. Deploy a MySQL instance downstream. + + - In a lab environment, you can use Docker to quickly deploy a MySQL instance by running the following command: + + ```shell + docker run --name some-mysql -e MYSQL_ROOT_PASSWORD=my-secret-pw -p 3306:3306 -d mysql + ``` + + - In a production environment, you can deploy a MySQL instance by following instructions in [Installing MySQL](https://dev.mysql.com/doc/refman/8.0/en/installing.html). + +3. Simulate service workload. + + In the lab environment, you can use `go-tpc` to write data to the TiDB cluster upstream. This is to generate event changes in the TiDB cluster. Run the following command to create a database named `tpcc` in the TiDB cluster, and then use TiUP bench to write data to this database. + + ```shell + tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare + tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s + ``` + + For more details about `go-tpc`, refer to [How to Run TPC-C Test on TiDB](/benchmark/benchmark-tidb-using-tpcc.md). + +## Step 2. Migrate full data + +After setting up the environment, you can use [Dumpling](/dumpling-overview.md) to export the full data from the upstream TiDB cluster. + +> **Note:** +> +> In production clusters, performing a backup with GC disabled might affect cluster performance. It is recommended that you complete this step in off-peak hours. + +1. Disable Garbage Collection (GC). + + To ensure that newly written data is not deleted during incremental migration, you should disable GC for the upstream cluster before exporting full data. In this way, history data is not deleted. + + Run the following command to disable GC: + + ```sql + MySQL [test]> SET GLOBAL tidb_gc_enable=FALSE; + ``` + + ``` + Query OK, 0 rows affected (0.01 sec) + ``` + + To verify that the change takes effect, query the value of `tidb_gc_enable`: + + ```sql + MySQL [test]> SELECT @@global.tidb_gc_enable; + ``` + + ``` + +-------------------------+: + | @@global.tidb_gc_enable | + +-------------------------+ + | 0 | + +-------------------------+ + 1 row in set (0.00 sec) + ``` + +2. Back up data. + + 1. Export data in SQL format using Dumpling: + + ```shell + tiup dumpling -u root -P 4000 -h 127.0.0.1 --filetype sql -t 8 -o ./dumpling_output -r 200000 -F256MiB + ``` + + 2. After finishing exporting data, run the following command to check the metadata. `Pos` in the metadata is the TSO of the export snapshot and can be recorded as the BackupTS. + + ```shell + cat dumpling_output/metadata + ``` + + ``` + Started dump at: 2022-06-28 17:49:54 + SHOW MASTER STATUS: + Log: tidb-binlog + Pos: 434217889191428107 + GTID: + Finished dump at: 2022-06-28 17:49:57 + ``` + +3. Restore data. + + Use MyLoader (an open-source tool) to import data to the downstream MySQL instance. For details about how to install and use MyLoader, see [MyDumpler/MyLoader](https://github.com/mydumper/mydumper). Run the following command to import full data exported by Dumpling to MySQL: + + ```shell + myloader -h 127.0.0.1 -P 3306 -d ./dumpling_output/ + ``` + +4. (Optional) Validate data. + + You can use [sync-diff-inspector](/sync-diff-inspector/sync-diff-inspector-overview.md) to check data consistency between upstream and downstream at a certain time. + + ```shell + sync_diff_inspector -C ./config.yaml + ``` + + For details about how to configure the sync-diff-inspector, see [Configuration file description](/sync-diff-inspector/sync-diff-inspector-overview.md#configuration-file-description). In this document, the configuration is as follows: + + ```toml + # Diff Configuration. + ######################### Datasource config ######################### + [data-sources] + [data-sources.upstream] + host = "127.0.0.1" # Replace the value with the IP address of your upstream cluster + port = 4000 + user = "root" + password = "" + snapshot = "434217889191428107" # Set snapshot to the actual backup time (BackupTS in the "Back up data" section in [Step 2. Migrate full data](#step-2-migrate-full-data)) + [data-sources.downstream] + host = "127.0.0.1" # Replace the value with the IP address of your downstream cluster + port = 3306 + user = "root" + password = "" + ######################### Task config ######################### + [task] + output-dir = "./output" + source-instances = ["upstream"] + target-instance = "downstream" + target-check-tables = ["*.*"] + ``` + +## Step 3. Migrate incremental data + +1. Deploy TiCDC. + + After finishing full data migration, deploy and configure a TiCDC cluster to replicate incremental data. In production environments, deploy TiCDC as instructed in [Deploy TiCDC](/ticdc/deploy-ticdc.md). In this document, a TiCDC node has been started upon the creation of the test cluster. Therefore, you can skip the step of deploying TiCDC and proceed with the next step to create a changefeed. + +2. Create a changefeed. + + In the upstream cluster, run the following command to create a changefeed from the upstream to the downstream clusters: + + ```shell + tiup ctl:v6.1.0 cdc changefeed create --pd=http://127.0.0.1:2379 --sink-uri="mysql://root:@127.0.0.1:3306" --changefeed-id="upstream-to-downstream" --start-ts="434217889191428107" + ``` + + In this command, the parameters are as follows: + + - `--pd`: PD address of the upstream cluster + - `--sink-uri`: URI of the downstream cluster + - `--changefeed-id`: changefeed ID, must be in the format of a regular expression, `^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$` + - `--start-ts`: start timestamp of the changefeed, must be the backup time (or BackupTS in the "Back up data" section in [Step 2. Migrate full data](#step-2-migrate-full-data)) + + For more information about the changefeed configurations, see [Task configuration file](/ticdc/manage-ticdc.md#task-configuration-file). + +3. Enable GC. + + In incremental migration using TiCDC, GC only removes history data that is replicated. Therefore, after creating a changefeed, you need to run the following command to enable GC. For details, see [What is the complete behavior of TiCDC garbage collection (GC) safepoint](/ticdc/ticdc-faq.md#what-is-the-complete-behavior-of-ticdc-garbage-collection-gc-safepoint). + + To enable GC, run the following command: + + ```sql + MySQL [test]> SET GLOBAL tidb_gc_enable=TRUE; + ``` + + ``` + Query OK, 0 rows affected (0.01 sec) + ``` + + To verify that the change takes effect, query the value of `tidb_gc_enable`: + + ```sql + MySQL [test]> SELECT @@global.tidb_gc_enable; + ``` + + ``` + +-------------------------+ + | @@global.tidb_gc_enable | + +-------------------------+ + | 1 | + +-------------------------+ + 1 row in set (0.00 sec) + ``` + +## Step 4. Switch services + +After creating a changefeed, data written to the upstream cluster is replicated to the downstream cluster with low latency. You can migrate read stream to the downstream cluster gradually. Observe the read stream for a period. If the downstream cluster is stable, you can switch write stream to the downstream cluster as well in the following steps: + +1. Stop write services in the upstream cluster. Make sure that all upstream data are replicated to downstream before stopping the changefeed. + + ```shell + # Stop the changefeed from the upstream cluster to the downstream cluster + tiup cdc cli changefeed pause -c "upstream-to-downstream" --pd=http://172.16.6.122:2379 + # View the changefeed status + tiup cdc cli changefeed list + ``` + + ``` + [ + { + "id": "upstream-to-downstream", + "summary": { + "state": "stopped", # Ensure that the status is stopped + "tso": 434218657561968641, + "checkpoint": "2022-06-28 18:38:45.685", # This time should be later than the time of stopping writing + "error": null + } + } + ] + ``` + +2. After migrating writing services to the downstream cluster, observe for a period. If the downstream cluster is stable, you can quit the upstream cluster. diff --git a/migrate-from-tidb-to-tidb.md b/migrate-from-tidb-to-tidb.md index a932e64fa0a10..b2d8228190d9a 100644 --- a/migrate-from-tidb-to-tidb.md +++ b/migrate-from-tidb-to-tidb.md @@ -26,9 +26,7 @@ This document exemplifies the whole migration process and contains the following 1. Deploy TiDB clusters. - Deploy two TiDB clusters, one upstream and the other downstream by using tiup playground. For more information, refer to [Deploy and Maintain an Online TiDB Cluster Using TiUP](/tiup/tiup-cluster.md). - - {{< copyable "shell-regular" >}} + Deploy two TiDB clusters, one upstream and the other downstream by using TiUP Playground. For more information, refer to [Deploy and Maintain an Online TiDB Cluster Using TiUP](/tiup/tiup-cluster.md). ```shell # Create an upstream cluster @@ -43,16 +41,12 @@ This document exemplifies the whole migration process and contains the following By default, test databases are created in the newly deployed clusters. Therefore, you can use [sysbench](https://github.com/akopytov/sysbench#linux) to generate test data and simulate data in real scenarios. - {{< copyable "shell-regular" >}} - ```shell sysbench oltp_write_only --config-file=./tidb-config --tables=10 --table-size=10000 prepare ``` In this document, we use sysbench to run the `oltp_write_only` script. This script generates 10 tables in the test database, each with 10,000 rows. The tidb-config is as follows: - {{< copyable "shell-regular" >}} - ```shell mysql-host=172.16.6.122 # Replace the value with the IP address of your upstream cluster mysql-port=4000 @@ -70,8 +64,6 @@ This document exemplifies the whole migration process and contains the following In real scenarios, service data is continuously written to the upstream cluster. In this document, we use sysbench to simulate this workload. Specifically, run the following command to enable 10 workers to continuously write data to three tables, sbtest1, sbtest2, and sbtest3, with a total TPS not exceeding 100. - {{< copyable "shell-regular" >}} - ```shell sysbench oltp_write_only --config-file=./tidb-config --tables=3 run ``` @@ -80,8 +72,6 @@ This document exemplifies the whole migration process and contains the following In full data backup, both the upstream and downstream clusters need to access backup files. It is recommended that you use [External storage](/br/backup-and-restore-storages.md) to store backup files. In this document, Minio is used to simulate an S3-compatible storage service. - {{< copyable "shell-regular" >}} - ```shell wget https://dl.min.io/server/minio/release/linux-amd64/minio chmod +x minio @@ -104,8 +94,6 @@ This document exemplifies the whole migration process and contains the following The access link is as follows: - {{< copyable "shell-regular" >}} - ```shell s3://backup?access-key=minio&secret-access-key=miniostorage&endpoint=http://${HOST_IP}:6060&force-path-style=true ``` @@ -116,18 +104,31 @@ After setting up the environment, you can use the backup and restore functions o > **Note:** > +> In production clusters, performing a backup with GC disabled might affect cluster performance. It is recommended that you back up data in off-peak hours, and set `RATE_LIMIT` to a proper value to avoid performance degradation. +> > If the versions of the upstream and downstream clusters are different, you should check [BR compatibility](/br/backup-and-restore-overview.md#before-you-use-br). In this document, we assume that the upstream and downstream clusters are the same version. 1. Disable GC. - To ensure that newly written data is not deleted during incremental migration, you should disable GC for the upstream cluster before backup. In this way, history data will not be deleted. + To ensure that newly written data is not deleted during incremental migration, you should disable GC for the upstream cluster before backup. In this way, history data is not deleted. - {{< copyable "sql" >}} + Run the following command to disable GC: ```sql MySQL [test]> SET GLOBAL tidb_gc_enable=FALSE; + ``` + + ``` Query OK, 0 rows affected (0.01 sec) + ``` + + To verify that the change takes effect, query the value of `tidb_gc_enable`: + + ```sql MySQL [test]> SELECT @@global.tidb_gc_enable; + ``` + + ``` +-------------------------+: | @@global.tidb_gc_enable | +-------------------------+ @@ -136,18 +137,15 @@ After setting up the environment, you can use the backup and restore functions o 1 row in set (0.00 sec) ``` - > **Note:** - > - > In production clusters, performing a backup with GC disabled might affect cluster performance. It is recommended that you back up data in off-peak hours, and set RATE_LIMIT to a proper value to avoid performance degradation. - 2. Back up data. Run the `BACKUP` statement in the upstream cluster to back up data: - {{< copyable "sql" >}} - ```sql MySQL [(none)]> BACKUP DATABASE * TO 's3://backup?access-key=minio&secret-access-key=miniostorage&endpoint=http://${HOST_IP}:6060&force-path-style=true' RATE_LIMIT = 120 MB/SECOND; + ``` + + ``` +---------------+----------+--------------------+---------------------+---------------------+ | Destination | Size | BackupTS | Queue Time | Execution Time | +---------------+----------+--------------------+---------------------+---------------------+ @@ -162,10 +160,11 @@ After setting up the environment, you can use the backup and restore functions o Run the `RESTORE` command in the downstream cluster to restore data: - {{< copyable "sql" >}} - ```sql mysql> RESTORE DATABASE * FROM 's3://backup?access-key=minio&secret-access-key=miniostorage&endpoint=http://${HOST_IP}:6060&force-path-style=true'; + ``` + + ``` +--------------+-----------+--------------------+---------------------+---------------------+ | Destination | Size | BackupTS | Queue Time | Execution Time | +--------------+-----------+--------------------+---------------------+---------------------+ @@ -174,20 +173,16 @@ After setting up the environment, you can use the backup and restore functions o 1 row in set (41.85 sec) ``` -4. (Optional) Check data. +4. (Optional) Validate data. You can use [sync-diff-inspector](/sync-diff-inspector/sync-diff-inspector-overview.md) to check data consistency between upstream and downstream at a certain time. The preceding `BACKUP` output shows that the upstream cluster finishes backup at 431434047157698561. The preceding `RESTORE` output shows that the downstream finishes restoration at 431434141450371074. - {{< copyable "shell-regular" >}} - ```shell sync_diff_inspector -C ./config.yaml ``` For details about how to configure the sync-diff-inspector, see [Configuration file description](/sync-diff-inspector/sync-diff-inspector-overview.md#configuration-file-description). In this document, the configuration is as follows: - {{< copyable "shell-regular" >}} - ```shell # Diff Configuration. ######################### Datasource config ######################### @@ -197,7 +192,7 @@ After setting up the environment, you can use the backup and restore functions o port = 4000 user = "root" password = "" - snapshot = "431434047157698561" # Set snapshot to the actual backup time (see BackupTS in the previous step) + snapshot = "431434047157698561" # Set snapshot to the actual backup time (BackupTS in the "Back up data" section in [Step 2. Migrate full data](#step-2-migrate-full-data)) [data-sources.downstream] host = "172.16.6.125" # Replace the value with the IP address of your downstream cluster port = 4000 @@ -230,23 +225,34 @@ After setting up the environment, you can use the backup and restore functions o In this command, the parameters are as follows: - - --pd: PD address of the upstream cluster - - --sink-uri: URI of the downstream cluster - - --changefeed-id: changefeed ID, must be in the format of a regular expression, ^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$ - - --start-ts: start timestamp of the changefeed, must be the backup time (or BackupTS mentioned in the previous step) + - `--pd`: PD address of the upstream cluster + - `--sink-uri`: URI of the downstream cluster + - `--changefeed-id`: changefeed ID, must be in the format of a regular expression, ^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$ + - `--start-ts`: start timestamp of the changefeed, must be the backup time (or BackupTS in the "Back up data" section in [Step 2. Migrate full data](#step-2-migrate-full-data)) For more information about the changefeed configurations, see [Task configuration file](/ticdc/manage-ticdc.md#task-configuration-file). 3. Enable GC. - In incremental migration using TiCDC, GC only removes history data that is replicated. Therefore, after creating a changefeed, you need to run the following command to enable GC. For details, see [What is the complete behavior of TiCDC garbage collection (GC) safepoint?](/ticdc/ticdc-faq.md#what-is-the-complete-behavior-of-ticdc-garbage-collection-gc-safepoint). + In incremental migration using TiCDC, GC only removes history data that is replicated. Therefore, after creating a changefeed, you need to run the following command to enable GC. For details, see [What is the complete behavior of TiCDC garbage collection (GC) safepoint?](/ticdc/ticdc-faq.md#what-is-the-complete-behavior-of-ticdc-garbage-collection-gc-safepoint). - {{< copyable "sql" >}} + To enable GC, run the following command: ```sql MySQL [test]> SET GLOBAL tidb_gc_enable=TRUE; + ``` + + ``` Query OK, 0 rows affected (0.01 sec) + ``` + + To verify that the change takes effect, query the value of `tidb_gc_enable`: + + ```sql MySQL [test]> SELECT @@global.tidb_gc_enable; + ``` + + ``` +-------------------------+ | @@global.tidb_gc_enable | +-------------------------+ @@ -257,18 +263,19 @@ After setting up the environment, you can use the backup and restore functions o ## Step 4. Switch services to the new TiDB cluster -After creating a changefeed, data written to the upstream cluster is replicated to the downstream cluster with low latency. You can migrate read stream to the downstream cluster gradually. Observe a period. If the downstream cluster is stable, you can switch write stream to the downstream cluster as well, which may include three steps: +After creating a changefeed, data written to the upstream cluster is replicated to the downstream cluster with low latency. You can migrate read stream to the downstream cluster gradually. Observe a period. If the downstream cluster is stable, you can switch write stream to the downstream cluster by performing the following steps: 1. Stop write services in the upstream cluster. Make sure that all upstream data are replicated to downstream before stopping the changefeed. - {{< copyable "shell-regular" >}} - ```shell # Stop the changefeed from the upstream cluster to the downstream cluster tiup cdc cli changefeed pause -c "upstream-to-downstream" --pd=http://172.16.6.122:2379 # View the changefeed status tiup cdc cli changefeed list + ``` + + ``` [ { "id": "upstream-to-downstream", @@ -284,8 +291,6 @@ After creating a changefeed, data written to the upstream cluster is replicated 2. Create a changefeed from downstream to upstream. You can leave `start-ts` unspecified so as to use the default setting, because the upstream and downstream data are consistent and there is no new data written to the cluster. - {{< copyable "shell-regular" >}} - ```shell tiup cdc cli changefeed create --pd=http://172.16.6.125:2379 --sink-uri="mysql://root:@172.16.6.122:4000" --changefeed-id="downstream -to-upstream" ``` diff --git a/migrate-large-mysql-shards-to-tidb.md b/migrate-large-mysql-shards-to-tidb.md index 0e29eb350104f..20eaa0cba93ac 100644 --- a/migrate-large-mysql-shards-to-tidb.md +++ b/migrate-large-mysql-shards-to-tidb.md @@ -98,7 +98,7 @@ The following table describes parameters in the command above. For more informat | `-r` or `--row` | Specifies the maximum number of rows in a single file. If you use this parameter, Dumpling enables the in-table concurrency to speed up the export and reduce the memory usage.| | `-F` | Specifies the maximum size of a single file. The unit is `MiB`. It is recommended to keep the value to 256 MiB. | | `-B` or `--database` | Specifies databases to be exported. | -| `-f` or `--filter` | Sexport tables that match the filter pattern. For the filter syntax, see [table-filter](/table-filter.md) | +| `-f` or `--filter` | Exports tables that match the filter pattern. For the filter syntax, see [table-filter](/table-filter.md). | Ensure that there is enough free space in `${data-path}`. It is strongly recommended to use the `-F` option to avoid interruptions in the backup process due to oversized single tables. @@ -219,7 +219,7 @@ Follow these steps to start `tidb-lightning`: - View progress via the monitoring dashboard. For more information, see [TiDB Lightning Monitoring]( /tidb-lightning/monitor-tidb-lightning.md). - View the progress via the Web page. See [Web Interface](/tidb-lightning/tidb-lightning-web-interface.md). -After the importing finishes, TiDB Lightning will exit automatically. To make sure that the data is imported successfully, check for `the whole procedure completed` among the last 5 lines in the log. +After TiDB Lightning completes the import, it exits automatically. Check whether `tidb-lightning.log` contains `the whole procedure completed` in the last lines. If yes, the import is successful. If no, the import encounters an error. Address the error as instructed in the error message. > **Note:** > @@ -265,8 +265,8 @@ The parameters are described as follows. |Parameter | Description | |- |- | -|--master-addr | {advertise-addr} of any DM-master node in the cluster that dmctl connects to. For example: 172.16.10.71:8261| -| operate-source create | Load data sources to DM clusters. | +|`--master-addr` | {advertise-addr} of any DM-master node in the cluster that dmctl connects to. For example: 172.16.10.71:8261| +| `operate-source create` | Load data sources to DM clusters. | Repeat the above steps until all MySQL upstream instances are added to the DM as data sources. diff --git a/migrate-large-mysql-to-tidb.md b/migrate-large-mysql-to-tidb.md index 86acbddf1baa2..6f5fd51edd040 100644 --- a/migrate-large-mysql-to-tidb.md +++ b/migrate-large-mysql-to-tidb.md @@ -142,7 +142,7 @@ The target TiKV cluster must have enough disk space to store the imported data. - Check progress in [the monitoring dashboard](/tidb-lightning/monitor-tidb-lightning.md). - Check progress in [the TiDB Lightning web interface](/tidb-lightning/tidb-lightning-web-interface.md). -4. After TiDB Lightning completes the import, it exits automatically. If you find the last 5 lines of its log print `the whole procedure completed`, the import is successful. +4. After TiDB Lightning completes the import, it exits automatically. Check whether `tidb-lightning.log` contains `the whole procedure completed` in the last lines. If yes, the import is successful. If no, the import encounters an error. Address the error as instructed in the error message. > **Note:** > diff --git a/migrate-small-mysql-shards-to-tidb.md b/migrate-small-mysql-shards-to-tidb.md index b0bd9c02b9c16..ea9b65d468836 100644 --- a/migrate-small-mysql-shards-to-tidb.md +++ b/migrate-small-mysql-shards-to-tidb.md @@ -97,8 +97,8 @@ The parameters are described as follows. |Parameter | Description | |- |- | -|--master-addr | {advertise-addr} of any DM-master node in the cluster that dmctl connects to. For example: 172.16.10.71:8261| -|operate-source create | Load data sources to the DM clusters. | +|`--master-addr` | `{advertise-addr}` of any DM-master node in the cluster that dmctl connects to. For example: 172.16.10.71:8261| +|`operate-source create` | Load data sources to the DM clusters. | Repeat the above steps until all data sources are added to the DM cluster. @@ -195,8 +195,8 @@ tiup dmctl --master-addr ${advertise-addr} start-task task.yaml | Parameter | Description| |-|-| -|--master-addr| {advertise-addr} of any DM-master node in the cluster that dmctl connects to. For example: 172.16.10.71:8261 | -|start-task | Starts the data migration task. | +|`--master-addr`| `{advertise-addr}` of any DM-master node in the cluster that dmctl connects to. For example: 172.16.10.71:8261 | +|`start-task` | Starts the data migration task. | If the migration task fails to start, modify the configuration information according to the error information, and then run `start-task task.yaml` again to start the migration task. If you encounter problems, see [Handle Errors](/dm/dm-error-handling.md) and [FAQ](/dm/dm-faq.md). @@ -210,7 +210,7 @@ After starting the migration task, you can use `dmtcl tiup` to run `query-status tiup dmctl --master-addr ${advertise-addr} query-status ${task-name} ``` -If you encounter errors, use `query-status ` to view more detailed information. For details about the query results, task status and sub task status of the `query-status` command, see [TiDB Data Migration Query Status](/dm/dm-query-status.md). +If you encounter errors, use `query-status ${task-name}` to view more detailed information. For details about the query results, task status and sub task status of the `query-status` command, see [TiDB Data Migration Query Status](/dm/dm-query-status.md). ## Step 5. Monitor tasks and check logs (optional) diff --git a/migrate-small-mysql-to-tidb.md b/migrate-small-mysql-to-tidb.md index 2642abc7af416..7360bf7dd3b4b 100644 --- a/migrate-small-mysql-to-tidb.md +++ b/migrate-small-mysql-to-tidb.md @@ -48,7 +48,7 @@ The parameters used in the command above are described as follows: |Parameter |Description| | :- | :- | -|`--master-addr` |The {advertise-addr} of any DM-master node in the cluster where `dmctl` is to connect. For example, 172.16.10.71:8261. +|`--master-addr` |`{advertise-addr}` of any DM-master node in the cluster where `dmctl` is to connect. For example, 172.16.10.71:8261. |`operate-source create`|Load the data source to the DM cluster.| ## Step 2. Create the migration task @@ -113,7 +113,7 @@ The parameters used in the command above are described as follows: |Parameter|Description| | - | - | -|`--master-addr`| The {advertise-addr} of any DM-master node in the cluster where `dmctl` is to connect. For example: 172.16.10.71:8261. | +|`--master-addr`| `{advertise-addr}` of any DM-master node in the cluster where `dmctl` is to connect. For example: 172.16.10.71:8261. | |`start-task`| Start the migration task | If the task fails to start, after changing the configuration according to the returned result, you can run the `start-task task.yaml` command to restart the task. If you encounter problems, refer to [Handle Errors](/dm/dm-error-handling.md) and [FAQ](/dm/dm-faq.md). diff --git a/migrate-with-more-columns-downstream.md b/migrate-with-more-columns-downstream.md index c20e01002dfbc..65013e28dff26 100644 --- a/migrate-with-more-columns-downstream.md +++ b/migrate-with-more-columns-downstream.md @@ -77,7 +77,7 @@ In such cases, you can use the `binlog-schema` command to set a table schema for |Parameter |Description| |:-- |:---| - |`-master-addr` |Specifies the `${advertise-addr}` of any DM-master node in the cluster where dmctl is to be connected. `${advertise-addr}` indicates the address that DM-master advertises to the outside world.| + |`-master-addr` |Specifies `${advertise-addr}` of any DM-master node in the cluster where dmctl is to be connected. `${advertise-addr}` indicates the address that DM-master advertises to the outside world.| |`binlog-schema set`| Manually set the schema information.| |`-s` | Specifies the source. `${source-id}` indicates the source ID of MySQL data.| |`${task-name}`| Specifies the name of the migration task defined in the `task.yaml` configuration file of the data migration task.| diff --git a/replicate-between-primary-and-secondary-clusters.md b/replicate-between-primary-and-secondary-clusters.md index 1a78f7e21459e..1669ade17c852 100644 --- a/replicate-between-primary-and-secondary-clusters.md +++ b/replicate-between-primary-and-secondary-clusters.md @@ -18,7 +18,7 @@ To replicate incremental data from a running TiDB cluster to its secondary clust 1. Deploy TiDB clusters. - Deploy two TiDB clusters, one upstream and the other downstream by using tiup playground. For production environments, deploy the clusters by referring to [Deploy and Maintain an Online TiDB Cluster Using TiUP](/tiup/tiup-cluster.md). + Deploy two TiDB clusters, one upstream and the other downstream by using TiUP Playground. For production environments, deploy the clusters by referring to [Deploy and Maintain an Online TiDB Cluster Using TiUP](/tiup/tiup-cluster.md). In this document, we deploy the two clusters on two machines: @@ -26,8 +26,6 @@ To replicate incremental data from a running TiDB cluster to its secondary clust - Node B: 172.16.6.124, for deploying the downstream TiDB cluster - {{< copyable "shell-regular" >}} - ```shell # Create an upstream cluster on Node A tiup --tag upstream playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 @@ -41,16 +39,12 @@ To replicate incremental data from a running TiDB cluster to its secondary clust By default, test databases are created in the newly deployed clusters. Therefore, you can use [sysbench](https://github.com/akopytov/sysbench#linux) to generate test data and simulate data in real scenarios. - {{< copyable "shell-regular" >}} - ```shell sysbench oltp_write_only --config-file=./tidb-config --tables=10 --table-size=10000 prepare ``` In this document, we use sysbench to run the `oltp_write_only` script. This script generates 10 tables in the upstream database, each with 10,000 rows. The tidb-config is as follows: - {{< copyable "shell-regular" >}} - ```shell mysql-host=172.16.6.122 # Replace it with the IP address of your upstream cluster mysql-port=4000 @@ -68,8 +62,6 @@ To replicate incremental data from a running TiDB cluster to its secondary clust In real scenarios, service data is continuously written to the upstream cluster. In this document, we use sysbench to simulate this workload. Specifically, run the following command to enable 10 workers to continuously write data to three tables, sbtest1, sbtest2, and sbtest3, with a total TPS not exceeding 100. - {{< copyable "shell-regular" >}} - ```shell sysbench oltp_write_only --config-file=./tidb-config --tables=3 run ``` @@ -78,8 +70,6 @@ To replicate incremental data from a running TiDB cluster to its secondary clust In full data backup, both the upstream and downstream clusters need to access backup files. It is recommended that you use [External storage](/br/backup-and-restore-storages.md#external-storages) to store backup files. In this example, Minio is used to simulate an S3-compatible storage service. - {{< copyable "shell-regular" >}} - ```shell wget https://dl.min.io/server/minio/release/linux-amd64/minio chmod +x minio @@ -103,8 +93,6 @@ To replicate incremental data from a running TiDB cluster to its secondary clust The link is as follows: - {{< copyable "shell-regular" >}} - ```shell s3://backup?access-key=minio&secret-access-key=miniostorage&endpoint=http://${HOST_IP}:6060&force-path-style=true ``` @@ -115,18 +103,31 @@ After setting up the environment, you can use the backup and restore functions o > **Note:** > -> If the versions of the upstream and downstream clusters are different, you should check [BR compatibility](/br/backup-and-restore-overview.md#before-you-use-br). In this document, we assume that the upstream and downstream clusters are the same version. +> - In production clusters, performing a backup with GC disabled might affect cluster performance. It is recommended that you back up data in off-peak hours, and set RATE_LIMIT to a proper value to avoid performance degradation. +> +> - If the versions of the upstream and downstream clusters are different, you should check [BR compatibility](/br/backup-and-restore-overview.md#before-you-use-br). In this document, we assume that the upstream and downstream clusters are the same version. 1. Disable GC. - To ensure that newly written data is not deleted during incremental migration, you should disable GC for the upstream cluster before backup. In this way, history data will not be deleted. + To ensure that newly written data is not deleted during incremental migration, you should disable GC for the upstream cluster before backup. In this way, history data is not deleted. - {{< copyable "sql" >}} + Run the following command to disable GC: ```sql MySQL [test]> SET GLOBAL tidb_gc_enable=FALSE; + ``` + + ``` Query OK, 0 rows affected (0.01 sec) + ``` + + To verify that the change takes effect, query the value of `tidb_gc_enable`: + + ```sql MySQL [test]> SELECT @@global.tidb_gc_enable; + ``` + + ``` +-------------------------+ | @@global.tidb_gc_enable | +-------------------------+ @@ -135,18 +136,15 @@ After setting up the environment, you can use the backup and restore functions o 1 row in set (0.00 sec) ``` - > **Note:** - > - > In production clusters, performing a backup with GC disabled might affect cluster performance. It is recommended that you back up data in off-peak hours, and set RATE_LIMIT to a proper value to avoid performance degradation. - 2. Back up data. Run the `BACKUP` statement in the upstream cluster to back up data: - {{< copyable "sql" >}} - ```sql MySQL [(none)]> BACKUP DATABASE * TO 's3://backup?access-key=minio&secret-access-key=miniostorage&endpoint=http://${HOST_IP}:6060&force-path-style=true' RATE_LIMIT = 120 MB/SECOND; + ``` + + ``` +----------------------+----------+--------------------+---------------------+---------------------+ | Destination | Size | BackupTS | Queue Time | Execution Time | +----------------------+----------+--------------------+---------------------+---------------------+ @@ -161,10 +159,11 @@ After setting up the environment, you can use the backup and restore functions o Run the `RESTORE` command in the downstream cluster to restore data: - {{< copyable "sql" >}} - ```sql mysql> RESTORE DATABASE * FROM 's3://backup?access-key=minio&secret-access-key=miniostorage&endpoint=http://${HOST_IP}:6060&force-path-style=true'; + ``` + + ``` +----------------------+----------+--------------------+---------------------+---------------------+ | Destination | Size | BackupTS | Queue Time | Execution Time | +----------------------+----------+--------------------+---------------------+---------------------+ @@ -173,20 +172,16 @@ After setting up the environment, you can use the backup and restore functions o 1 row in set (41.85 sec) ``` -4. (Optional) Check data. +4. (Optional) Validate data. Use [sync-diff-inspector](/sync-diff-inspector/sync-diff-inspector-overview.md) to check data consistency between upstream and downstream at a certain time. The preceding `BACKUP` output shows that the upstream cluster finishes backup at 431434047157698561. The preceding `RESTORE` output shows that the downstream finishes restoration at 431434141450371074. - {{< copyable "shell-regular" >}} - ```shell sync_diff_inspector -C ./config.yaml ``` For details about how to configure the sync-diff-inspector, see [Configuration file description](/sync-diff-inspector/sync-diff-inspector-overview.md#configuration-file-description). In this document, the configuration is as follows: - {{< copyable "shell-regular" >}} - ```shell # Diff Configuration. ######################### Global config ######################### @@ -227,8 +222,6 @@ After setting up the environment, you can use the backup and restore functions o Create a changefeed configuration file `changefeed.toml`. - {{< copyable "shell-regular" >}} - ```shell [consistent] # Consistency level, eventual means enabling consistent replication @@ -239,17 +232,15 @@ After setting up the environment, you can use the backup and restore functions o In the upstream cluster, run the following command to create a changefeed from the upstream to the downstream clusters: - {{< copyable "shell-regular" >}} - ```shell tiup cdc cli changefeed create --pd=http://172.16.6.122:2379 --sink-uri="mysql://root:@172.16.6.125:4000" --changefeed-id="primary-to-secondary" --start-ts="431434047157698561" ``` In this command, the parameters are as follows: - - --pd: PD address of the upstream cluster - - --sink-uri: URI of the downstream cluster - - --start-ts: start timestamp of the changefeed, must be the backup time (or BackupTS mentioned in [Step 2. Migrate full data](#step-2-migrate-full-data)) + - `--pd`: PD address of the upstream cluster + - `--sink-uri`: URI of the downstream cluster + - `--start-ts`: start timestamp of the changefeed, must be the backup time (or BackupTS mentioned in [Step 2. Migrate full data](#step-2-migrate-full-data)) For more information about the changefeed configurations, see [Task configuration file](/ticdc/manage-ticdc.md#task-configuration-file). @@ -257,12 +248,23 @@ After setting up the environment, you can use the backup and restore functions o In incremental migration using TiCDC, GC only removes history data that is replicated. Therefore, after creating a changefeed, you need to run the following command to enable GC. For details, see [What is the complete behavior of TiCDC garbage collection (GC) safepoint?](/ticdc/ticdc-faq.md#what-is-the-complete-behavior-of-ticdc-garbage-collection-gc-safepoint). - {{< copyable "sql" >}} + To enable GC, run the following command: ```sql MySQL [test]> SET GLOBAL tidb_gc_enable=TRUE; + ``` + + ``` Query OK, 0 rows affected (0.01 sec) + ``` + + To verify that the change takes effect, query the value of `tidb_gc_enable`: + + ```sql MySQL [test]> SELECT @@global.tidb_gc_enable; + ``` + + ``` +-------------------------+ | @@global.tidb_gc_enable | +-------------------------+ @@ -279,15 +281,13 @@ Create a disastrous event in the upstream cluster while it is running. For examp Normally, TiCDC concurrently writes transactions to downstream to increase throughout. When a changefeed is interrupted unexpectedly, the downstream may not have the latest data as it is in the upstream. To address inconsistency, run the following command to ensure that the downstream data is consistent with the upstream data. -{{< copyable "shell-regular" >}} - ```shell tiup cdc redo apply --storage "s3://redo?access-key=minio&secret-access-key=miniostorage&endpoint=http://172.16.6.123:6060&force-path-style=true" --tmp-dir /tmp/redo --sink-uri "mysql://root:@172.16.6.124:4000" ``` -- --storage: Location and credential of the redo log in S3 -- --tmp-dir: Cache directory of the redo log downloaded from S3 -- --sink-uri: URI of the downstream cluster +- `--storage`: Location and credential of the redo log in S3 +- `--tmp-dir`: Cache directory of the redo log downloaded from S3 +- `--sink-uri`: URI of the downstream cluster ## Step 6. Recover the primary cluster and its services @@ -295,16 +295,12 @@ After the previous step, the downstream (secondary) cluster has data that is con 1. Deploy a new TiDB cluster on Node A as the new primary cluster. - {{< copyable "shell-regular" >}} - ```shell tiup --tag upstream playground v5.4.0 --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 ``` 2. Use BR to back up and restore data fully from the secondary cluster to the primary cluster. - {{< copyable "shell-regular" >}} - ```shell # Back up full data of the secondary cluster tiup br --pd http://172.16.6.124:2379 backup full --storage ./backup @@ -314,8 +310,6 @@ After the previous step, the downstream (secondary) cluster has data that is con 3. Create a new changefeed to back up data from the primary cluster to the secondary cluster. - {{< copyable "shell-regular" >}} - ```shell # Create a changefeed tiup cdc cli changefeed create --pd=http://172.16.6.122:2379 --sink-uri="mysql://root:@172.16.6.125:4000" --changefeed-id="primary-to-secondary" diff --git a/replicate-data-to-kafka.md b/replicate-data-to-kafka.md index a9288e41cbfe2..7eccaf7fa9792 100644 --- a/replicate-data-to-kafka.md +++ b/replicate-data-to-kafka.md @@ -1,106 +1,167 @@ --- -title: Replicate data from TiDB to Apache Kafka -summary: Learn how to replicate data from TiDB to Apache Kafka +title: Integrate Data with Apache Kafka and Apache Flink +summary: Learn how to replicate TiDB data to Apache Kafka and Apache Flink using TiCDC. --- -# Replicate Data from TiDB to Apache Kafka +# Integrate Data with Apache Kafka and Apache Flink -This document describes how to replicate data from TiDB to Apache Kafka by using [TiCDC](/ticdc/ticdc-overview.md), which includes the following steps: +This document describes how to replicate TiDB data to Apache Kafka and Apache Flink using [TiCDC](/ticdc/ticdc-overview.md). The organization of this document is as follows: -- Deploy a TiCDC cluster and a Kafka cluster. -- Create a changefeed with Kafka as the sink. -- Write data to the TiDB cluster by using go-tpc. On Kafka console consumer, check that the data is replicated to a specified Kafka topic. +1. Quickly deploy a TiDB cluster with TiCDC included, and create a Kafka cluster and a Flink cluster. +2. Create a changefeed that replicates data from TiDB to Kafka. +3. Write data to TiDB using go-tpc. +4. Observe data on Kafka console consumer and check that the data is replicated to a specified Kafka topic. +5. (Optional) Configure the Flink cluster to consume Kafka data. -These steps are performed in a lab environment. You can also deploy a cluster for a production environment by referring to these steps. +The preceding steps are performed in a lab environment. You can also deploy a cluster in a production environment by referring to these steps. ## Step 1. Set up the environment -1. Deploy a TiCDC cluster. +1. Deploy a TiDB cluster with TiCDC included. - You can deploy a TiCDC quickly by running the `tiup playground` command. - - {{< copyable "shell-regular" >}} + In a lab or testing environment, you can deploy a TiDB cluster with TiCDC included quickly by using TiUP Playground. ```shell tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 - # View cluster status tiup status ``` - In a production environment, you can deploy a TiCDC as instructed in [Deploy TiCDC](/ticdc/deploy-ticdc.md). + If TiUP is not installed yet, refer to [Install TiUP](/tiup/tiup-overview.md#install-tiup). In a production environment, you can deploy a TiCDC as instructed in [Deploy TiCDC](/ticdc/deploy-ticdc.md). -2. Deploy a Kafka cluster. +2. Create a Kafka cluster. - - To quickly deploy a Kafka cluster, refer to [Apache Kakfa Quickstart](https://kafka.apache.org/quickstart). - - To deploy a Kafka cluster in production environments, refer to [Running Kafka in Production](https://docs.confluent.io/platform/current/kafka/deployment.html). + - Lab environment: refer to [Apache Kakfa Quickstart](https://kafka.apache.org/quickstart) to start a Kafka cluster. + - Production environment: refer to [Running Kafka in Production](https://docs.confluent.io/platform/current/kafka/deployment.html) to deploy a Kafka production cluster. -## Step 2. Create a changefeed +3. (Optional) Create a Flink cluster. -Use tiup ctl to create a changefeed with Kafka as the downstream node. + - Lab environment: refer to [Apache Flink First steps](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/try-flink/local_installation/) to start a Flink cluster. + - Production environment: refer to [Apache Kafka Deployment](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/overview/) to deploy a Flink production cluster. -{{< copyable "shell-regular" >}} +## Step 2. Create a Kafka changefeed -```shell -tiup ctl cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" --changefeed-id="kafka-changefeed" -``` +1. Create a changefeed configuration file. -If the command is executed successfully, information about the changefeed is displayed, such as the changefeed ID and the sink URI. + As required by Flink, incremental data of each table must be sent to an independent topic, and a partition must be dispatched for each event based on the primary key value. Therefore, you need to create a changefeed configuration file `changefeed.conf` with the following contents: -{{< copyable "shell-regular" >}} + ``` + [sink] + dispatchers = [ + {matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"}, + ] + ``` -```shell -Create changefeed successfully! -ID: kafka-changefeed -Info: {"sink-uri":"kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json","opts":{},"create-time":"2022-04-06T14:45:10.824475+08:00","start-ts":432335096583028737,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"canal-json","column-selectors":null},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"none","max-log-size":64,"flush-interval":1000,"storage":""}},"state":"normal","error":null,"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v6.1.0-master"} - ``` + For detailed description of `dispatchers` in the configuration file, see [Customize the rules for Topic and Partition dispatchers of Kafka Sink](/ticdc/manage-ticdc.md#customize-the-rules-for-topic-and-partition-dispatchers-of-kafka-sink). -If the command does not return any information, you should check network connectivity from the server where the command is executed to the target Kafka cluster. +2. Create a changefeed to replicate incremental data to Kafka: -In production environments, a Kafka cluster has multiple broker nodes. Therefore, you can add the addresses of multiple brokers to the sink UIR. This improves stable access to the Kafka cluster. When a Kafka cluster is faulty, the changefeed still works. Suppose that a Kafka cluster has three broker nodes, with IP addresses being 127.0.0.1:9092, 127.0.0.2:9092, and 127.0.0.3:9092, respectively. You can create a changefeed with the following sink URI. + ```shell + tiup ctl:v6.2.0 cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" --changefeed-id="kafka-changefeed" --config="changefeed.conf" + ``` -{{< copyable "shell-regular" >}} + - If the changefeed is successfully created, changefeed information, such as changefeed ID, is displayed, as shown below: -```shell -tiup ctl cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" -``` + ```shell + Create changefeed successfully! + ID: kafka-changefeed + Info: {... changfeed info json struct ...} + ``` -After executing the preceding command, run the following command to check the status of the changefeed. + - If no result is returned after you run the command, check the network connectivity between the server where you run the command and the Kafka machine specified in the sink URI. -{{< copyable "shell-regular" >}} + In a production environment, a Kafka cluster has multiple broker nodes. Therefore, you can add the addresses of multiple brokers to the sink UIR. This ensures stable access to the Kafka cluster. When the Kafka cluster is down, the changefeed still works. Suppose that a Kafka cluster has three broker nodes, with IP addresses being 127.0.0.1:9092, 127.0.0.2:9092, and 127.0.0.3:9092, respectively. You can create a changefeed with the following sink URI. -```shell -tiup ctl cdc changefeed list --pd="http://127.0.0.1:2379" -``` + ```shell + tiup ctl:v6.2.0 cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" --config="changefeed.conf" + ``` -You can manage the status of a changefeed as instructed in [Manage replication tasks (`changefeed`)](/ticdc/manage-ticdc.md#manage-replication-tasks-changefeed). +3. After creating the changefeed, run the following command to check the changefeed status: -## Step 3. Generate data changes in the TiDB cluster + ```shell + tiup ctl:v6.2.0 cdc changefeed list --pd="http://127.0.0.1:2379" + ``` -After a changefeed is created, once there is any event change in the TiDB cluster, such as an `INSERT`, `UPDATE`, or `DELETE` operation, data change is generated in TiCDC. Then TiCDC replicates the data change to the sink specified in the changefeed. In this document, the sink is Kafka and the data change is written to the specified Kafka topic. + You can refer to [Manage TiCDC Cluster and Replication Tasks](/ticdc/manage-ticdc.md) to manage the changefeed. -1. Simulate service workload. +## Step 3. Write data to generate change logs - In the lab environment, you can use `go-tpc` to write data to the TiDB cluster, which is used as the source of the changefeed. Specifically, run the following command to create a database `tpcc` in the upstream TiDB cluster. Then use `TiUP bench` to write data to this new database. +After the preceding steps are done, TiCDC sends change logs of incremental data in the TiDB cluster to Kafka. This section describes how to write data into TiDB to generate change logs. + +1. Simulate service workload. - {{< copyable "shell-regular" >}} + To generate change logs in a lab environment, you can use go-tpc to write data to the TiDB cluster. Specifically, run the following command to use TiUP bench to create a `tpcc` database and write data to this new database. ```shell - create database tpcc; tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s ``` - For more details about `go-tpc`, refer to [How to Run TPC-C Test on TiDB](/benchmark/benchmark-tidb-using-tpcc.md). + For more details about go-tpc, refer to [How to Run TPC-C Test on TiDB](/benchmark/benchmark-tidb-using-tpcc.md). -2. Consume data change from Kafka. +2. Consume data in the Kafka topic. - When a changefeed works normally, it writes data to the Kafka topic. You can run `kafka-console-consumer.sh` to view the written data. - - {{< copyable "shell-regular" >}} + When a changefeed works normally, it writes data to the Kafka topic. Run `kafka-console-consumer.sh`. You can see that data is successfully written to the Kafka topic. ```shell ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}` ``` - In production environments, you need to develop Kafka Consumer to consume the data in the Kafka topic. +At this time, incremental data of the TiDB database is successfully replicated to Kafka. Next, you can use Flink to consume Kafka data. Alternatively, you can develop a Kafka consumer client yourself for specific service scenarios. + +## (Optional) Step 4. Configure Flink to consume Kafka data + +1. Install a Flink Kafka connector. + + In the Flink ecosystem, a Flink Kafka connector is used to consume Kafka data and output data to Flink. However, Flink Kafka connectors are not automatically installed. To use it, add a Flink Kafka connector and its dependencies to the Flink installation directory after installing Flink. Specifically, download the following jar files to the `lib` directory of the Flink installation directory. If you have already run the Flink cluster, restart it to load the new plugin. + + - [flink-connector-kafka-1.15.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka/1.15.0/flink-connector-kafka-1.15.0.jar) + - [flink-sql-connector-kafka-1.15.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.0/flink-sql-connector-kafka-1.15.0.jar) + - [kafka-clients-3.2.0.jar](https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar) + +2. Create a table. + + In the directory where Flink is installed, run the following command to start the Flink SQL client: + + ```shell + [root@flink flink-1.15.0]# ./bin/sql-client.sh + ``` + + Then, run the following command to create a table named `tpcc_orders`. + + ```sql + CREATE TABLE tpcc_orders ( + o_id INTEGER, + o_d_id INTEGER, + o_w_id INTEGER, + o_c_id INTEGER, + o_entry_d STRING, + o_carrier_id INTEGER, + o_ol_cnt INTEGER, + o_all_local INTEGER + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'tidb_tpcc_orders', + 'properties.bootstrap.servers' = '127.0.0.1:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'canal-json', + 'scan.startup.mode' = 'earliest-offset', + 'properties.auto.offset.reset' = 'earliest' + ) + ``` + + Replace `topic` and `properties.bootstrap.servers` with the actual values in the environment. + +3. Query data of the table. + + Run the following command to query data of the `tpcc_orders` table: + + ```sql + SELECT * FROM tpcc_orders; + ``` + + After this command is executed, you can see that there is new data in the table, as shown in the following figure. + + ![SQL query result](/media/integrate/sql-query-result.png) + +Data integration with Kafka is done. diff --git a/ticdc/integrate-confluent-using-ticdc.md b/ticdc/integrate-confluent-using-ticdc.md index d559c104aa4a9..5f67d77bc33b0 100644 --- a/ticdc/integrate-confluent-using-ticdc.md +++ b/ticdc/integrate-confluent-using-ticdc.md @@ -22,7 +22,7 @@ The preceding steps are performed in a lab environment. You can also deploy a cl 1. Deploy a TiDB cluster with TiCDC included. - In a lab or testing environment, you can deploy a TiDB cluster with TiCDC quickly by using TiUP Playground. + In a lab or testing environment, you can deploy a TiDB cluster with TiCDC included quickly by using TiUP Playground. ```shell tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1