Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions TOC.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@
- [Manage Changefeeds](/ticdc/ticdc-manage-changefeed.md)
- [Log Filter](/ticdc/ticdc-filter.md)
- [Bidirectional Replication](/ticdc/ticdc-bidirectional-replication.md)
- [Data Integrity Validation for Single-Row Data](/ticdc/ticdc-integrity-check.md)
- Monitor and Alert
- [Monitoring Metrics Summary](/ticdc/ticdc-summary-monitor.md)
- [Monitoring Metrics Details](/ticdc/monitor-ticdc.md)
Expand Down
66 changes: 66 additions & 0 deletions functions-and-operators/tidb-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,25 @@ summary: Learn about the usage of TiDB specific functions.

The following functions are TiDB extensions, and are not present in MySQL:

<CustomContent platform="tidb">

| Function name | Function description |
| :-------------- | :------------------------------------- |
| `TIDB_BOUNDED_STALENESS()` | The `TIDB_BOUNDED_STALENESS` function instructs TiDB to read the data as new as possible within the time range. See also: [Read Historical Data Using the `AS OF TIMESTAMP` Clause](/as-of-timestamp.md) |
| [`TIDB_DECODE_KEY(str)`](#tidb_decode_key) | The `TIDB_DECODE_KEY` function can be used to decode a TiDB-encoded key entry into a JSON structure containing `_tidb_rowid` and `table_id`. These encoded keys can be found in some system tables and in logging outputs. |
| [`TIDB_DECODE_PLAN(str)`](#tidb_decode_plan) | The `TIDB_DECODE_PLAN` function can be used to decode a TiDB execution plan. |
| `TIDB_IS_DDL_OWNER()` | The `TIDB_IS_DDL_OWNER` function can be used to check whether or not the TiDB instance you are connected to is the one that is the DDL Owner. The DDL Owner is the TiDB instance that is tasked with executing DDL statements on behalf of all other nodes in the cluster. |
| [`TIDB_PARSE_TSO(num)`](#tidb_parse_tso) | The `TIDB_PARSE_TSO` function can be used to extract the physical timestamp from a TiDB TSO timestamp. See also: [`tidb_current_ts`](/system-variables.md#tidb_current_ts). |
| [`TIDB_VERSION()`](#tidb_version) | The `TIDB_VERSION` function returns the TiDB version with additional build information. |
| [`TIDB_DECODE_SQL_DIGESTS(digests, stmtTruncateLength)`](#tidb_decode_sql_digests) | The `TIDB_DECODE_SQL_DIGESTS()` function is used to query the normalized SQL statements (a form without formats and arguments) corresponding to the set of SQL digests in the cluster. |
| `VITESS_HASH(str)` | The `VITESS_HASH` function returns the hash of a string that is compatible with Vitess' `HASH` function. This is intended to help the data migration from Vitess. |
| `TIDB_SHARD()` | The `TIDB_SHARD` function can be used to create a shard index to scatter the index hotspot. A shard index is an expression index with a `TIDB_SHARD` function as the prefix.|
| `TIDB_ROW_CHECKSUM()` | The `TIDB_ROW_CHECKSUM` function is used to query the checksum value of a row. This function can only be used in `SELECT` statements within the FastPlan process. That is, you can query through statements like `SELECT TIDB_ROW_CHECKSUM() FROM t WHERE id = ?` or `SELECT TIDB_ROW_CHECKSUM() FROM t WHERE id IN (?, ?, ...)`. See also: [Data integrity validation for single-row data](/ticdc/ticdc-integrity-check.md). |

</CustomContent>

<CustomContent platform="tidb-cloud">

| Function name | Function description |
| :-------------- | :------------------------------------- |
| `TIDB_BOUNDED_STALENESS()` | The `TIDB_BOUNDED_STALENESS` function instructs TiDB to read the data as new as possible within the time range. See also: [Read Historical Data Using the `AS OF TIMESTAMP` Clause](/as-of-timestamp.md) |
Expand All @@ -18,6 +37,9 @@ The following functions are TiDB extensions, and are not present in MySQL:
| [`TIDB_DECODE_SQL_DIGESTS(digests, stmtTruncateLength)`](#tidb_decode_sql_digests) | The `TIDB_DECODE_SQL_DIGESTS()` function is used to query the normalized SQL statements (a form without formats and arguments) corresponding to the set of SQL digests in the cluster. |
| `VITESS_HASH(str)` | The `VITESS_HASH` function returns the hash of a string that is compatible with Vitess' `HASH` function. This is intended to help the data migration from Vitess. |
| `TIDB_SHARD()` | The `TIDB_SHARD` function can be used to create a shard index to scatter the index hotspot. A shard index is an expression index with a `TIDB_SHARD` function as the prefix.|
| `TIDB_ROW_CHECKSUM()` | The `TIDB_ROW_CHECKSUM` function is used to query the checksum value of a row. This function can only be used in `SELECT` statements within the FastPlan process. That is, you can query through statements like `SELECT TIDB_ROW_CHECKSUM() FROM t WHERE id = ?` or `SELECT TIDB_ROW_CHECKSUM() FROM t WHERE id IN (?, ?, ...)`. See also: [Data integrity validation for single-row data](https://docs.pingcap.com/tidb/stable/ticdc-integrity-check). |

</CustomContent>

## Examples

Expand Down Expand Up @@ -297,3 +319,47 @@ TIDBShardExpr ::=
```sql
CREATE TABLE test(id INT PRIMARY KEY CLUSTERED, a INT, b INT, UNIQUE KEY uk((tidb_shard(a)), a));
```

### TIDB_ROW_CHECKSUM

The `TIDB_ROW_CHECKSUM` function is used to query the checksum value of a row. This function can only be used in `SELECT` statements within the FastPlan process. That is, you can query through statements like `SELECT TIDB_ROW_CHECKSUM() FROM t WHERE id = ?` or `SELECT TIDB_ROW_CHECKSUM() FROM t WHERE id IN (?, ?, ...)`.

The synopsis is as follows:

```ebnf+diagram
TableStmt ::=
"TIDB_ROW_CHECKSUM()"
```

The following example shows how to use the `TIDB_ROW_CHECKSUM` function to query the checksum value of the row data:

To enable the checksum feature of single-row data in TiDB (controlled by the system variable [`tidb_enable_row_level_checksum`](/system-variables.md#tidb_enable_row_level_checksum-new-in-v710)), run the following statement:

```sql
SET GLOBAL tidb_enable_row_level_checksum = ON;
```

Create table `t` and insert data:

```sql
USE test;
CREATE TABLE t (id INT PRIMARY KEY, k INT, c int);
INSERT INTO TABLE t values (1, 10, a);
```

The following statement shows how to query the checksum value of the row where `id = 1` in table `t`:

```sql
SELECT *, TIDB_ROW_CHECKSUM() FROM t WHERE id = 1;
```

The output is as follows:

```sql
+----+------+------+---------------------+
| id | k | c | TIDB_ROW_CHECKSUM() |
+----+------+------+---------------------+
| 1 | 10 | a | 3813955661 |
+----+------+------+---------------------+
1 row in set (0.000 sec)
```
19 changes: 19 additions & 0 deletions system-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -2097,6 +2097,25 @@ Query OK, 0 rows affected (0.09 sec)
- Default value: `ON`
- This variable is used to control whether to enable the support for window functions. Note that window functions may use reserved keywords. This might cause SQL statements that could be executed normally cannot be parsed after upgrading TiDB. In this case, you can set `tidb_enable_window_function` to `OFF`.

### `tidb_enable_row_level_checksum` <span class="version-mark">New in v7.1.0</span>

- Scope: GLOBAL
- Persists to cluster: Yes
- Type: Boolean
- Default value: `OFF`

<CustomContent platform="tidb">

- This variable is used to control whether to enable the [TiCDC data integrity validation for single-row data](/ticdc/ticdc-integrity-check.md) feature.

</CustomContent>

<CustomContent platform="tidb-cloud">

- This variable is used to control whether to enable the [TiCDC data integrity validation for single-row data](https://docs.pingcap.com/tidb/stable/ticdc-integrity-check) feature.

</CustomContent>

### tidb_enforce_mpp <span class="version-mark">New in v5.1</span>

- Scope: SESSION
Expand Down
13 changes: 9 additions & 4 deletions ticdc/ticdc-changefeed-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,24 @@ include-commit-ts = false
# Specifies the replication consistency configurations for a changefeed when using the redo log. For more information, see https://docs.pingcap.com/tidb/stable/ticdc-sink-to-mysql#eventually-consistent-replication-in-disaster-scenarios.
# Note: The consistency-related configuration items only take effect when the downstream is a database and the redo log feature is enabled.
[sink.consistent]
# The data consistency level. Available options are "none" and "eventual". "none" means that the redo log is disabled.
# The data consistency level. Available options are "none" and "eventual". "none" means that the redo log is disabled.
# The default value is "none".
level = "none"
# The max redo log size in MB.
# The default value is 64.
# The default value is 64.
max-log-size = 64
# The flush interval for redo log. The default value is 2000 milliseconds.
flush-interval = 2000
flush-interval = 2000
# The storage URI of the redo log.
# The default value is empty.
storage = ""
storage = ""
# Specifies whether to store the redo log in a file.
# The default value is false.
use-file-backend = false

[integrity]
# Whether to enable the checksum validation for single-row data. The default value is "none", which means to disable the feature. Value options are "none" and "correctness".
integrity-check-level = "none"
# Specifies the log level of the Changefeed when the checksum validation for single-row data fails. The default value is "warn". Value options are "warn" and "error".
corruption-handle-level = "warn"
```
104 changes: 104 additions & 0 deletions ticdc/ticdc-integrity-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
---
title: TiCDC Data Integrity Validation for Single-Row Data
summary: Introduce the implementation principle and usage of the TiCDC data integrity validation feature.
---

# TiCDC Data Integrity Validation for Single-Row Data

Starting from v7.1.0, TiCDC introduces the data integrity validation feature, which uses a checksum algorithm to validate the integrity of single-row data. This feature helps verify whether any error occurs in the process of writing data from TiDB, replicating it through TiCDC, and then writing it to a Kafka cluster. The data integrity validation feature only supports changefeeds that use Kafka as the downstream and currently supports the Avro protocol.

## Implementation principles

After you enable the checksum integrity validation feature for single-row data, TiDB uses the CRC32 algorithm to calculate the checksum of a row and writes it to TiKV along with the data. TiCDC reads the data from TiKV and recalculates the checksum using the same algorithm. If the two checksums are equal, it indicates that the data is consistent during the transmission from TiDB to TiCDC.

TiCDC then encodes the data into a specific format and sends it to Kafka. After the Kafka Consumer reads data, it calculates a new checksum using the same algorithm as TiDB. If the new checksum is equal to the checksum in the data, it indicates that the data is consistent during the transmission from TiCDC to the Kafka Consumer.

For more information about the algorithm of the checksum, see [Algorithm for checksum calculation](#algorithm-for-checksum-calculation).

## Enable the feature

TiCDC disables data integrity validation by default. To enable it, perform the following steps:

1. Enable the checksum integrity validation feature for single-row data in the upstream TiDB cluster by setting the [`tidb_enable_row_level_checksum`](/system-variables.md#tidb_enable_row_level_checksum-new-in-v710) system variable:

```sql
SET GLOBAL tidb_enable_row_level_checksum = ON;
```

This configuration only takes effect for newly created sessions, so you need to reconnect to TiDB.

2. In the [configuration file](/ticdc/ticdc-changefeed-config.md#changefeed-configuration-parameters) specified by the `--config` parameter when you create a changefeed, add the following configurations:

```toml
[integrity]
integrity-check-level = "correctness"
corruption-handle-level = "warn"
```

3. When using Avro as the data encoding format, you need to set [`enable-tidb-extension=true`](/ticdc/ticdc-sink-to-kafka.md#configure-sink-uri-for-kafka) in the [`sink-uri`](/ticdc/ticdc-sink-to-kafka.md#configure-sink-uri-for-kafka). To prevent numerical precision loss during network transmission, which can cause checksum validation failures, you also need to set [`avro-decimal-handling-mode=string`](/ticdc/ticdc-sink-to-kafka.md#configure-sink-uri-for-kafka) and [`avro-bigint-unsigned-handling-mode=string`](/ticdc/ticdc-sink-to-kafka.md#configure-sink-uri-for-kafka). The following is an example:

```shell
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-checksum" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true&avro-decimal-handling-mode=string&avro-bigint-unsigned-handling-mode=string" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
```

With the preceding configuration, each message written to Kafka by the changefeed will include the corresponding data's checksum. You can verify data consistency based on these checksum values.

> **Note:**
>
> For existing changefeeds, if `avro-decimal-handling-mode` and `avro-bigint-unsigned-handling-mode` are not set, enabling the checksum validation feature might cause schema compatibility issues. To resolve this issue, you can modify the compatibility type of the Schema Registry to `NONE`. For more details, see [Schema Registry](https://docs.confluent.io/platform/current/schema-registry/fundamentals/avro.html#no-compatibility-checking).

## Disable the feature

TiCDC disables data integrity validation by default. To disable this feature after enabling it, perform the following steps:

1. Follow the `Pause Task -> Modify Configuration -> Resume Task` process described in [Update task configuration](/ticdc/ticdc-manage-changefeed.md#update-task-configuration) and remove all `[integrity]` configurations in the configuration file specified by the `--config` parameter of the changefeed.

```toml
[integrity]
integrity-check-level = "none"
corruption-handle-level = "warn"
```

2. Execute the following SQL statement in the upstream TiDB to disable the checksum integrity validation feature ([`tidb_enable_row_level_checksum`](/system-variables.md#tidb_enable_row_level_checksum-new-in-v710)):

```sql
SET GLOBAL tidb_enable_row_level_checksum = OFF;
```

The preceding configuration only takes effect for newly created sessions. After all clients writing to TiDB have reconnected, the messages written by changefeed to Kafka will no longer include the checksum for the corresponding data.

## Algorithm for checksum calculation

The pseudocode for the checksum calculation algorithm is as follows:

```
fn checksum(columns) {
let result = 0
for column in sort_by_schema_order(columns) {
result = crc32.update(result, encode(column))
}
return result
}
```

* `columns` should be sorted by column ID. In the Avro schema, fields are already sorted by column ID, so you can directly use the order in `columns`.

* The `encode(column)` function encodes the column value into bytes. Encoding rules vary based on the data type of the column. The specific rules are as follows:

* TINYINT, SMALLINT, INT, BIGINT, MEDIUMINT, and YEAR types are converted to UINT64 and encoded in little-endian. For example, the number `0x0123456789abcdef` is encoded as `hex'0x0123456789abcdef'`.
* FLOAT and DOUBLE types are converted to DOUBLE and then encoded as UINT64 in IEEE754 format.
* BIT, ENUM, and SET types are converted to UINT64.

* BIT type is converted to UINT64 in binary format.
* ENUM and SET types are converted to their corresponding INT values in UINT64. For example, if the data value of a `SET('a','b','c')` type column is `'a,c'`, the value is encoded as `0b101`.

* TIMESTAMP, DATE, DURATION, DATETIME, JSON, and DECIMAL types are converted to STRING and then encoded as UTF8 bytes.
* VARBIANRY, BINARY, and BLOB types (including TINY, MEDIUM, and LONG) are directly encoded as bytes.
* VARCHAR, CHAR, and TEXT types (including TINY, MEDIUM, and LONG) are encoded as UTF8 bytes.
* NULL and GEOMETRY types are excluded from the checksum calculation and this function returns empty bytes.

> **Note:**
>
> After enabling the checksum validation feature, DECIMAL and UNSIGNED BIGINT types data will be converted to string types. Therefore, in the downstream consumer code, you need to convert them back to their corresponding numerical types before calculating checksum values.

The consumer code written in Golang implements steps such as decoding data read from Kafka, sorting by schema fields, and calculating the checksum value. For more information, see [`avro/decoder.go`](https://github.com/pingcap/tiflow/blob/master/pkg/sink/codec/avro/decoder.go).