Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TI-CDC] Using canal JSON to output to Kafka, different primary key types produce inconsistent results #6269

Closed
coralzu opened this issue Jul 7, 2022 · 19 comments
Assignees
Labels
type/question The issue belongs to a question.

Comments

@coralzu
Copy link

coralzu commented Jul 7, 2022

Bug Report

Please answer these questions before submitting your issue. Thanks!

1. Minimal reproduce step (Required)

1.Deploy tidb and ticdc
2.Create a table with int type as primary key

CREATE TABLE `int_id_table` (
  `int_id` int(11) NOT NULL,
  `var1` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`int_id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

3.Create another table with varchar type as primary key

CREATE TABLE `varchar_id_table` (
  `varchar_id` varchar(32) NOT NULL,
  `var2` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`varchar_id`) /*T![clustered_index] NONCLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

4.Create a ticdc synchronization task to output the change logs of the above two tables to kakfa

{"changefeed_id":"test4","sink_uri":"kafka://xxxxxxx:9092/ticdc-test4?kafka-version=2.5.0&protocol=canal-json",
"filter_rules":["test.int_id_table","test.varchar_id_table"]}

5.Insert data into the above two tables, and then update their primary keys

insert into int_id_table value ('1','1');
insert into varchar_id_table value ('2','2');
update int_id_table set int_id = '3' where int_id = '1';
update varchar_id_table set varchar_id = '4' where varchar_id = '2';

2. What did you expect to see? (Required)

The log of primary key update is split into delete log and insert log, which is convenient for downstream processing with flink.

3. What did you see instead (Required)

The int type primary key change log is split, but the varchar type primary key log is not split.
1

4. What is your TiDB version? (Required)

Release Version: v5.4.0
Edition: Community
Git Commit Hash: 55f3b24c1c9f506bd652ef1d162283541e428872
Git Branch: heads/refs/tags/v5.4.0
UTC Build Time: 2022-01-25 08:39:26
GoVersion: go1.16.4
Race Enabled: false
TiKV Min Version: v3.0.0-60965b006877ca7234adaced7890d7b029ed1306
Check Table Before Drop: false

@Rustin170506
Copy link
Member

I think that's to be expected.

You can checkout https://docs.pingcap.com/tidb/v5.0/clustered-indexes.

For the int Primary Key CLUSTERED: an update event will be split to an insert event and a delete event.
For the varchar Primary Key CLUSTERED: an update event still an update event.

You can try

CREATE TABLE `int_id_table` (
  `int_id` int(11) NOT NULL,
  `var1` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`int_id`) /*T![clustered_index] NONCLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

I think this will be an update event rather than an insert event and a delete event.

@Rustin170506
Copy link
Member

I transferred this issue from pingcap/tidb to pingcap/tiflow, because TiCDC was developed in this repo.

@Rustin170506
Copy link
Member

The log of primary key update is split into delete log and insert log, which is convenient for downstream processing with flink.

You mean Flink can't handle update events?

@coralzu
Copy link
Author

coralzu commented Jul 13, 2022

Flink inserted the updated data, but did not delete the data before the update.
Because there is no delete event.

@coralzu
Copy link
Author

coralzu commented Jul 13, 2022

This will cause invalid data accumulation when I use ticdc as the source to synchronize data to the downstream, because such old ID data has not been deleted.
This may be related to the degree of support for the Flink canal JSON format.
2

@Rustin170506
Copy link
Member

Got it. I'll test it.

@Rustin170506
Copy link
Member

I tested it. It seems works.

  1. start tidb cluster v5.4.0
  2. start kafka and flink with this docker-compose file
  3. create varchar_id_table table in tidb
  4. insert data insert into varchar_id_table value ('4','2');
  5. start flink sql
  6. create table
CREATE TABLE topic_test
(
    varchar_id  varchar(32) primary key,
    var2 varchar(255)
) WITH (
      'connector' = 'kafka',
      'topic' = 'ticdc-test',
      'properties.bootstrap.servers' = 'kafka:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'canal-json',
      'canal-json.ignore-parse-errors' = 'true'
      );
  1. select table
select *
from topic_test;
  1. update value update varchar_id_table set varchar_id = '8' where varchar_id = '4';
  2. checkout result updated
Screen.Recording.2022-07-18.at.2.51.57.PM.mov

@Rustin170506
Copy link
Member

I also tested it with:

insert into varchar_id_table value ('2','2');
update varchar_id_table set varchar_id = '4' where varchar_id = '2';

This also works well.

@Rustin170506
Copy link
Member

This will cause invalid data accumulation when I use ticdc as the source to synchronize data to the downstream, because such old ID data has not been deleted.

Maybe I don't understand what you mean. Can you explain why you think "update" doesn't work for you?

@coralzu
Copy link
Author

coralzu commented Jul 18, 2022

Sorry for my negligence. I'll add that the Flink SQL I use can parse the update log on the query statement, but when it is passed to the downstream, the update log is processed using upsert. When the primary key changes, there is no delete event, the old primary key data will not be deleted, and only the new primary key data will be inserted.
my flink sql like this:

CREATE TABLE test
(
    `varchar_id`  string primary key,
    `var2` string 
 ) WITH (
      'connector' = 'kafka',
      'topic' = 'ticdc-test4',
      'properties.bootstrap.servers' = 'xxx:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'canal-json',
      'canal-json.ignore-parse-errors' = 'true'
);
	  
create table sink(
    `varchar_id`  string primary key,
    `var2` string 
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://xxx:5432/xwflinktest?searchpath=public',
    'table-name' = 'varchar_id_table_copy',
    'username' = 'xxx',
    'password' = 'xxx'
);

insert into sink select * from test;

This will build a real-time synchronization task running in Flink.
3

kafka data like this:
1

Downstream database data like this:
2

@coralzu
Copy link
Author

coralzu commented Jul 18, 2022

When the update does not involve the primary key, it can work normally. flink use the upsert method to overwrite the data. When the primary key changes, upsert uses the after content in the update log, so it only writes the updated data to the downstream without deleting the before data. This is why I expect ticdc to be split into delete events and insert events like int primary keys.

@Rustin170506
Copy link
Member

Rustin170506 commented Jul 18, 2022

This is more like the internal mechanism of flink and the behavior of jdbc sink. You can set enable-old-value=false in changefeed config file, then you will always get delete and insert.

@Rustin170506
Copy link
Member

But please be aware of this bug. #6198

@coralzu
Copy link
Author

coralzu commented Jul 19, 2022

I try to use changefeed_ config to set the enable old value configuration, but it doesn't seem to work.

myconfig

enable-old-value = false
[filter]
rules = ['test.int_id_table','test.varchar_id_table']
[sink]
protocol = "canal-json"

my create command

./cdc cli changefeed create --pd=http://xxx:2379 --sink-uri="kafka://xxx:9092/ticdc-test4?kafka-version=2.5.0&protocol=canal-json" --config ./changefeed_config.toml

Synchronize task information
1
2

Is my configuration wrong?

@Rustin170506
Copy link
Member

emmm, sorry, I forgot that when you use the can-json protocol, you have to use the old value.

@coralzu
Copy link
Author

coralzu commented Jul 19, 2022

At present, I can only avoid the synchronization task of varchar primary key. Although this is caused by the mechanism of Flink, I still expect ticdc to unify the behavior of outputting logs through the adjustment of parameter configuration, which is very useful for users to maintain a consistent experience

By the way, I have also tried to use the Ti CDC connector by Flink Chinese community to complete such work, but I found that is not stable enough. I have also mentioned such problems in relevant communities
(apache/flink-cdc#1154).

@Rustin170506
Copy link
Member

Can you try the arvo protocol? It also works well with flink, but you need to use v6.1 version.(other versions have some bugs)

@coralzu
Copy link
Author

coralzu commented Jul 20, 2022

The project has no plan to upgrade tidb for the time being. I'll try it if I have a chance.

@Rustin170506
Copy link
Member

I'll close this issue. If you have any other questions, please feel free to reopen this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/question The issue belongs to a question.
Projects
None yet
Development

No branches or pull requests

4 participants