Have the following tools installed:
- Docker
- Docker Compose
- Maven
- Java 17
- jq
cd env
docker compose up -d
cd ..
# compile the project and move to the app folder
mvn clean package
Control center is available under http://localhost:9021
Creating the needed topics:
# create topics
cd env/
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic crm.users
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic crm.contracts
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic crm.generic-dlq
cd ..
cd readwrite-rules-app
# user subject
jq -n --rawfile schema src/main/resources/schema/user.avsc '{schema: $schema}' | \
curl --silent http://localhost:8081/subjects/crm.users-value/versions --json @- | jq
# contract subject
jq -n --rawfile schema src/main/resources/schema/contract.avsc '{schema: $schema}' | \
curl --silent http://localhost:8081/subjects/crm.contracts-value/versions --json @- | jq
Note
- We have created two schemas, one for users and one for contracts.
- They are plain vanilla schemas, with no metadata or rules.
# user metadata
curl -s http://localhost:8081/subjects/crm.users-value/versions \
--header "Content-Type: application/json" --header "Accept: application/json" \
--data "@src/main/resources/schema/user-metadata.json" | jq
# contract metadata
curl -s http://localhost:8081/subjects/crm.contracts-value/versions \
--header "Content-Type: application/json" --header "Accept: application/json" \
--data "@src/main/resources/schema/contract-metadata.json" | jq
Note
- We have added metadata to the schemas, indicating if they are GDPR sensitive, the owner and owner e-mail of these subjects.
- As you can see the metadata is not validated, it is just stored in the schema registry.
- Another important point is the id is increased by one, indicating a new version of the subject.
# users rules
curl http://localhost:8081/subjects/crm.users-value/versions \
--header "Content-Type: application/json" --header "Accept: application/json" \
--data @src/main/resources/schema/user-ruleset.json | jq
# contracts rules
curl http://localhost:8081/subjects/crm.contracts-value/versions \
--header "Content-Type: application/json" --header "Accept: application/json" \
--data @src/main/resources/schema/contract-ruleset.json | jq
Note
- We have added rules to the schemas, indicating, for example:
- The user schema has a rule that the user must be older than 18 years old
- The contract schema has a rule that the contract must have a valid date.
- As before, the id is increased by one, indicating a new version of the subject.
Check events being created or refused due to the condition rules.
java -classpath target/readwrite-rules-app-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.tomasalmeida.data.contract.readwrite.ProducerRunner
Tip
- Check the logs to see the events being produced and the ones that were not accepted by the rules.
- The producer produces 8 events, 3 of them will be accepted and 5 will be refused.
- In some cases, the refused events will be sent to the DLQ topic (check the rules to see which ones).
Check events being consumed and transformed during consumption.
java -classpath target/readwrite-rules-app-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.tomasalmeida.data.contract.readwrite.ConsumerRunner
Important
One of the users is transformed during consumption. The fullName is created by concatenating the firstName and lastName.
Check the DLQ topic to see the events that were not accepted by the rules and their headers.
kafka-console-consumer --bootstrap-server localhost:29092 \
--property schema.registry.url=http://localhost:8081 \
--property print.timestamp=false \
--property print.offset=false \
--property print.partition=false \
--property print.headers=true \
--property print.key=false \
--property print.value=true \
--topic crm.generic-dlq \
--from-beginning
Important
Note:
- The headers are printed in the console, showing the reason why the event was sent to the DLQ.
- Different events can be sent to the same DLQ topic, so the headers are important to understand to which topic this event should have been sent.
- Event is sent to the DLQ topic in JSON format, so it is easy to read the event and understand the reason why it was sent to the DLQ topic.
Create the resources
cd env
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic warehouse.products
cd ..
# register v1 product
jq -n --rawfile schema migration-app-v1/src/main/resources/schema/product.avsc '{schema: $schema, metadata: { properties: { app_version: 1 }}}' | \
curl --silent http://localhost:8081/subjects/warehouse.products-value/versions --json @- | jq
# register v2 product
jq -n --rawfile schema migration-app-v2/src/main/resources/schema/product.avsc '{schema: $schema, metadata: { properties: { app_version: 2 }}}' | \
curl --silent http://localhost:8081/subjects/warehouse.products-value/versions --json @- | jq
Warning
V2 insert will fail, as the default compatibility is reviewed and the one we are trying to create breaks it.
Error message is similar to:
{
"error_code": 409,
"message": "Schema being registered is incompatible with an earlier schema for subject \"warehouse.products-value\"
...
We need to enable the compatibility group. Let's tell the Schema Registry to use the app_version field to enable Compatibility Groups:
curl http://localhost:8081/config/warehouse.products-value \
-X PUT --json '{ "compatibilityGroup": "app_version" }'
We try again
# register v2 product
jq -n --rawfile schema migration-app-v2/src/main/resources/schema/product.avsc '{schema: $schema, metadata: { properties: { app_version: 2 }}}' | \
curl --silent http://localhost:8081/subjects/warehouse.products-value/versions --json @- | jq
It should work now and we have a new schema version.
curl http://localhost:8081/subjects/warehouse.products-value/versions \
--json @migration-app-v2/src/main/resources/schema/product-migration-rules.json | jq
⚠️ There is an important point, we need to indicate the app_version in our producer, so we need to add the lines
// indicates the app_version=1
properties.put(KafkaAvroDeserializerConfig.USE_LATEST_WITH_METADATA, "app_version=1");
productProducerV1 = new KafkaProducer<>(properties);
// indicates the app_version=2
properties.put(KafkaAvroDeserializerConfig.USE_LATEST_WITH_METADATA, "app_version=2");
productProducerV2 = new KafkaProducer<>(properties);
cd migration-app-v1
java -classpath target/migration-app-v1-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.tomasalmeida.data.contract.migration.ProducerRunner
cd migration-app-v2
java -classpath target/migration-app-v2-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.tomasalmeida.data.contract.migration.ProducerRunner
Important
What is happening now?
- The producer v1 is producing events with the v1 format (app_version=1) where the dimensions are part of the main product event.
- The producer v2 is producing events with the v2 format (app_version=2) where the dimensions are inside Dimension.
What are our expectations?
- The consumer V1 and V2 will consume the events produced by the both producers v1 and V2, with different schemas.
- The consumer V1 will transform the events produced by the producer v2 to the v1 format.
- The consumer V2 will transform the events produced by the producer v1 to the v2 format.
- The code is agnostic and we do not know what was the original schema of the event.
cd migration-app-v1
java -classpath target/migration-app-v1-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.tomasalmeida.data.contract.migration.ConsumerRunner
Important
Confirmed that the consumer v1 is transforming the events produced by the producer v2 to the v1 format.
cd migration-app-v1
java -classpath target/migration-app-v2-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.tomasalmeida.data.contract.migration.ConsumerRunner
Important
Confirmed that the consumer v2 is transforming the events produced by the producer v1 to the v2 format.
Creating the needed topics and compiling the project
# create topics
cd env/
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic data.clients
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic data.orders
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic data.products
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic data.dlq.invalid.clients
docker compose exec broker1 kafka-topics --bootstrap-server broker1:9092 --create --topic data.dlq.invalid.products
cd ..
cd global-rules-app
This step needs to be done before creating the schemas
curl -s http://localhost:8081/config \
-X PUT \
--header "Content-Type: application/json" \
--data @src/main/resources/schema/global-ruleset_v1.json | jq
# client subject
jq -n --rawfile schema src/main/resources/schema/client.avsc '{schema: $schema}' | \
curl --silent http://localhost:8081/subjects/data.clients-value/versions --json @- | jq
curl -s http://localhost:8081/config \
-X PUT \
--header "Content-Type: application/json" \
--data @src/main/resources/schema/global-ruleset_v2.json | jq
Important
- The overrideRuleSet, defaultRuleSet and the rules a schema have an order of precedence defined in Configuration enhancements.
- first, taking the default value
- then, merging the metadata from the schema on top of it
- finally, merging the override value for the final result
- Created schemas are not affected by the overrideRuleSet, only the new ones.
# client subject
jq -n --rawfile schema src/main/resources/schema/client.avsc '{schema: $schema}' | \
curl --silent http://localhost:8081/subjects/data.clients-value/versions --json @- | jq
# orders subject
jq -n --rawfile schema src/main/resources/schema/order.avsc '{schema: $schema}' | \
curl --silent http://localhost:8081/subjects/data.orders-value/versions --json @- | jq
# products subject
jq -n --rawfile schema src/main/resources/schema/product.avsc '{schema: $schema}' | \
curl --silent http://localhost:8081/subjects/data.products-value/versions --json @- | jq
All the schemas are registered with the defaultRuleSet and overrideRuleSet.
## run Client producer
java -classpath target/global-rules-app-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.tomasalmeida.data.contract.globalrules.ClientProducerRunner
Note
- Rules application:
- ClientIdValidation rule is applied to any field which has the tag
CLIENTID
- ProductIdValidation rule is applied to any field which has the tag
PRODUCTID
- CountryValidation rule is applied to any field named
countryCode
- ClientIdValidation rule is applied to any field which has the tag
- Product, client and order events are produced and as the rules are copied to the schemas, they are executed during the production.
Finally, let's check the DLQ topics. Note that CountryCode rule does not send data to a DLQ topic, but the ClientIdValidation and ProductIdValidation rules do.
kafka-console-consumer --bootstrap-server localhost:29092 \
--property schema.registry.url=http://localhost:8081 \
--property print.timestamp=false \
--property print.offset=false \
--property print.partition=false \
--property print.headers=true \
--property print.key=false \
--property print.value=true \
--topic data.dlq.invalid.clients \
--from-beginning
kafka-console-consumer --bootstrap-server localhost:29092 \
--property schema.registry.url=http://localhost:8081 \
--property print.timestamp=false \
--property print.offset=false \
--property print.partition=false \
--property print.headers=true \
--property print.key=false \
--property print.value=true \
--topic data.dlq.invalid.products \
--from-beginning
- Stop the consumers and producers
- Stop the environment
cd env
docker compose down -v && docker system prune -f
cd ..
- Code
- Docs and Blogs:
- Basics: