-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
avro: Add name strategy option for schema registry #3936
Conversation
This is for issue pingcap#1147 When creating a changefeed with `--opts nameStrategy=topic` this now uses the topic name as basis for the schema registry. Before this PR the schema registry always used `{schema}_{table}` as basis. This solves the case where KSQL expect the topic name to be used as basis for the schema registry. == Example Setup a test environment and create the changefeed ``` tiup playground --without-monitor --tiflash 0 v5.3.0 confluent local services start ./bin/cdc server ./bin/cdc cli changefeed create --no-confirm \ --changefeed-id="simple-replication-task" \ --sort-engine="unified" \ --sink-uri="kafka://127.0.0.1:9092/cdctest?protocol=avro&kafka-version=2.8.0" \ --opts registry="http://127.0.0.1:8081" \ --opts nameStrategy=topic ``` To verify the `nameStrategy`: ``` $ ./bin/cdc cli changefeed query -c simple-replication-task | jq '.info.opts.nameStrategy' "topic" ``` Now let's create a table that uses this changefeed: ``` CREATE TABLE t1 (id INT PRIMARY KEY, n VARCHAR(200) NOT NULL, ts TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)); INSERT INTO t1(id, n) VALUES (1, 'test'); ``` This now shows up in KSQL like this: ``` ksql> SHOW TOPICS; Kafka Topic | Partitions | Partition Replicas --------------------------------------------------------------- cdctest | 3 | 1 default_ksql_processing_log | 1 | 1 --------------------------------------------------------------- ksql> PRINT cdctest FROM BEGINNING; Key format: AVRO or KAFKA_STRING Value format: AVRO rowtime: 2021/12/17 07:06:07.227 Z, key: {"id": 1}, value: {"id": 1, "n": "test", "ts": 1639724765242}, partition: 0 ^CTopic printing ceased ``` And in the schema registry: ``` $ curl -s http://127.0.0.1:8081/subjects | jq [ "cdctest-key", "cdctest-value" ] ``` This now allows us to do this: ``` ksql> CREATE STREAM t1str WITH (KAFKA_TOPIC='cdctest', VALUE_FORMAT='avro'); Message ---------------- Stream created ---------------- ksql> DESCRIBE t1str; Name : T1STR Field | Type ------------------------- ID | INTEGER N | VARCHAR(STRING) TS | TIMESTAMP ------------------------- For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED; ``` Note that we didnt' have to specify the columns in the `CREATE STREAM` comman as these were fetched from the schema registry. See also: - https://docs.ksqldb.io/en/latest/operate-and-deploy/schema-registry-integration/#schema-inference
[REVIEW NOTIFICATION] This pull request has not been approved. To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
Thank you @dveeden for your PR. It would help us a lot because we can now use single-table changefeed and name the Kafka topic as we want without having to match the topic name with the
Debezium MySQL Connector uses the above mentioned implementation, it creates one Kafka topic per table automatically if the topic doesn't exist (or use the topic if it exists). Of course, we need to configure the Kafka cluster to enable auto topic creation (or create the topic manually before creating the table), but that doesn't raise any serious issue for us. Debezium also has |
@keweishang: Thanks for your review. The bot only counts approvals from reviewers and higher roles in list, but you're still welcome to leave your comments. In response to this: Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
@keweishang Thanks for the information. That helps a lot. |
Codecov Report
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## master #3936 +/- ##
================================================
+ Coverage 55.1722% 55.3314% +0.1592%
================================================
Files 485 489 +4
Lines 59829 60593 +764
================================================
+ Hits 33009 33527 +518
- Misses 23484 23689 +205
- Partials 3336 3377 +41 |
@dveeden: PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
In 6.1, with the implementation of #5338 and #4423 , now it's possible to achieve what this PR does. Generally speaking, you could use one changefeed to replicate multiple tables to kafka, each table corresponds to one topic. This is the solution 2 described by @dveeden in the PR description. I think we could close this PR now. |
What problem does this PR solve?
This improves compatibility with KSQL
This is for issue #1147
What is changed and how it works?
When creating a changefeed with
--opts nameStrategy=topic
this nowuses the topic name as basis for the schema registry. Before this PR
the schema registry always used
{schema}_{table}
as basis.This solves the case where KSQL expect the topic name to be used as
basis for the schema registry.
Example
Setup a test environment and create the changefeed
To verify the
nameStrategy
:Now let's create a table that uses this changefeed:
This now shows up in KSQL like this:
And in the schema registry:
This now allows us to do this:
Note that we didnt' have to specify the columns in the
CREATE STREAM
statement as these were fetched from the schema registry.
See also:
Considerations
If you have a changefeed it maps to a single topic. With the
nameStrategy=topic
this means that there can only be one set of subjects registered in the registry. As soon as you try to use this topic for more than one table the registry will respond with a "HTTP 409 Conflict".This doesn't have to be a problem as you could create multiple changefeeds, each with a filter for a single table and an unique topic.
This can even be seen as a feature as this will detect multiple incompatible things being written to the same topic.
Another solution is to implement multi-topic support, where a single changefeed creates one topic per table. However this would run into problems if topic auto creation is disabled, especially with online schema change tools like gh-ost and pt-osc which create temporary tables. However it might be possible to filter out OSC tables as these tables have predictable names.
Without this PR you could reach almost the same functionality by setting the topic in the sink URL to
{schema}_{table}
, this causes the topic name and the name in the schema registry to match for this table. However when a second table replicated via the same changefeed this will register itself under another name and won't result in an error, so this doesn't enforce schema consistency in the same way. Also when having multiple TiDB and CDC instances using the same Kafka infrastructure this makes it hard to keep topics unique as the{schema}_{table}
name might not be unique across multiple sources. With this PR you could name the topic{some_prefix}_{schema}_{table}
avoiding this problem.Check List
Tests
Code changes
Side effects
Related changes
Release note