Examples in Scala of
- Avro
- Kafka
- Schema Registry
- Kafka Streams
- with cats
- with ZIO, see also zio-kafka-streams
- Interactive Queries TODO
- with REST/http4s
- with GraphQl/Caliban
- KSQL
- Kafka Connect
- Extra
Local environment
# start locally
# - zookeeper
# - kafka
# - kafka-rest
# - kafka-ui
# - schema-registry
# - schema-registry-ui
# - ksql-server
# - ksql-cli
# - kafka-connect
# - kafka-connect-ui
docker-compose up
# (mac|linux) view kafka ui
[open|xdg-open] http://localhost:8000
# (mac|linux) view schema-registry ui
[open|xdg-open] http://localhost:8001
# (mac|linux) view kafka-connect ui
[open|xdg-open] http://localhost:8002
# cleanup
docker-compose down -v
If containers are crashing, make sure you have enough resources
# verify memory and cpu usage
docker ps -q | xargs docker stats --no-stream
# verify status
docker inspect <CONTAINER_NAME> | jq '.[].State'
Description
Avro serialization and deserialization examples of
SpecificRecord
code generation with sbt-avro [source|test]GenericRecord
[source|test]- avro4s [source|test]
- Java/Scala libraries compatibility [test]
Demo
# console
sbt avro/console
# generate avro classes
# avro/target/scala-2.12/classes/com/kafka/demo/User.class
sbt clean avro/compile
# test
sbt clean avro/test
- Data Serialization and Evolution
- Three Reasons Why Apache Avro Data Serialization is a Good Choice
- Schema evolution in Avro, Protocol Buffers and Thrift
- Avro Introduction for Big Data and Data Streaming Architectures
- Stream Avro Records into Kafka using Avro4s and Akka Streams Kafka
- Kafka, Spark and Avro
Description
Kafka apis example of
KafkaProducer
[doc|source] andKafkaConsumer
[doc|source] clients- CakeSolutions
KafkaProducer
[source|test] andKafkaConsumer
[source|test] clients
Demo
# access kafka
docker exec -it local-kafka bash
# create topic
# convention <MESSAGE_TYPE>.<DATASET_NAME>.<DATA_NAME>
# example [example.no-schema.original|example.no-schema.cakesolutions]
kafka-topics --zookeeper zookeeper:2181 \
--create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>
# delete topic
kafka-topics --zookeeper zookeeper:2181 \
--delete --topic <TOPIC_NAME>
# view topic
kafka-topics --zookeeper zookeeper:2181 --list
kafka-topics --zookeeper zookeeper:2181 --describe --topic <TOPIC_NAME>
# view topic offset
kafka-run-class kafka.tools.GetOffsetShell \
--broker-list kafka:9092 \
--time -1 \
--topic <TOPIC_NAME>
# list consumer groups
kafka-consumer-groups --bootstrap-server kafka:9092 --list
# view consumer group offset
kafka-consumer-groups \
--bootstrap-server kafka:9092 \
--group <GROUP_NAME> \
--describe
# reset consumer group offset
kafka-consumer-groups \
--bootstrap-server kafka:9092 \
--group <GROUP_NAME> \
--topic <TOPIC_NAME> \
--reset-offsets \
--to-earliest \
--execute
# console producer
kafka-console-producer --broker-list kafka:9092 --topic <TOPIC_NAME>
kafkacat -P -b 0 -t <TOPIC_NAME>
# console consumer
kafka-console-consumer --bootstrap-server kafka:9092 --topic <TOPIC_NAME> --from-beginning
kafkacat -C -b 0 -t <TOPIC_NAME>
# producer example
sbt "kafka/runMain com.kafka.demo.original.Producer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Producer"
# consumer example
sbt "kafka/runMain com.kafka.demo.original.Consumer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Consumer"
# test
sbt clean kafka/test
sbt "test:testOnly *KafkaSpec"
- DevOps Kafka
- Kafka topic naming conventions
- Should you put several event types in the same Kafka topic?
- How to choose the number of topics/partitions in a Kafka cluster?
- Kafka Partitioning
- The Log: What every software engineer should know about real-time data's unifying abstraction
- Apache Kafka: 8 things to check before going live
- Apache Kafka vs Apache Pulsar
Description
# register schema
# convention <TOPIC_NAME>-key or <TOPIC_NAME>-value
http -v POST :8081/subjects/example.with-schema.simple-value/versions \
Accept:application/vnd.schemaregistry.v1+json \
schema='{"type":"string"}'
# import schema from file
http -v POST :8081/subjects/example.with-schema.user-value/versions \
Accept:application/vnd.schemaregistry.v1+json \
schema=@avro/src/main/avro/user.avsc
# export schema to file
http :8081/subjects/example.with-schema.user-value/versions/latest \
| jq -r '.schema|fromjson' \
| tee avro/src/main/avro/user-latest.avsc
# list subjects
http -v :8081/subjects
# list subject's versions
http -v :8081/subjects/example.with-schema.simple-value/versions
# fetch by version
http -v :8081/subjects/example.with-schema.simple-value/versions/1
# fetch by id
http -v :8081/schemas/ids/1
# test compatibility
http -v POST :8081/compatibility/subjects/example.with-schema.simple-value/versions/latest \
Accept:application/vnd.schemaregistry.v1+json \
schema='{"type":"string"}'
# delete version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/1
# delete latest version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/latest
# delete subject
http -v DELETE :8081/subjects/example.with-schema.simple-value
# stringify
jq tostring avro/src/main/avro/user.avsc
Demo
-
BaseKafkaSchemaRegistrySpec
to test Kafka with SchemaRegistry -
SpecificRecord
with sbt-avrohugger [Producer|Consumer|test]
# generate SpecificRecord classes under "schema-registry/target/scala-2.12/src_managed/main/compiled_avro"
sbt clean schema-registry/avroScalaGenerateSpecific
# (optional) create schema
http -v POST :8081/subjects/example.with-schema.payment-key/versions \
Accept:application/vnd.schemaregistry.v1+json \
schema='{"type":"string"}'
http -v POST :8081/subjects/example.with-schema.payment-value/versions \
Accept:application/vnd.schemaregistry.v1+json \
schema=@schema-registry/src/main/avro/Payment.avsc
# access kafka
docker exec -it local-kafka bash
# (optional) create topic
kafka-topics --zookeeper zookeeper:2181 \
--create --if-not-exists --replication-factor 1 --partitions 1 --topic example.with-schema.payment
# console producer (binary)
kafka-console-producer --broker-list kafka:9092 --topic example.with-schema.payment
# console consumer (binary)
kafka-console-consumer --bootstrap-server kafka:9092 --topic example.with-schema.payment
# access schema-registry
docker exec -it local-schema-registry bash
# avro console producer
# example "MyKey",{"id":"MyId","amount":10}
kafka-avro-console-producer --broker-list kafka:29092 \
--topic example.with-schema.payment \
--property schema.registry.url=http://schema-registry:8081 \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type":"string"}' \
--property value.schema='{"namespace":"io.confluent.examples.clients.basicavro","type":"record","name":"Payment","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}'
# avro console consumer
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
--topic example.with-schema.payment \
--property schema.registry.url=http://schema-registry:8081 \
--property schema.id.separator=: \
--property print.key=true \
--property print.schema.ids=true \
--property key.separator=, \
--from-beginning
# producer example
sbt "schema-registry/runMain com.kafka.demo.specific.Producer"
# consumer example
sbt "schema-registry/runMain com.kafka.demo.specific.Consumer"
# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistrySpecificSpec"
GenericRecord
with CakeSolutions [Producer|Consumer] and schema evolution test
# producer example
sbt "schema-registry/runMain com.kafka.demo.generic.Producer"
# consumer example
sbt "schema-registry/runMain com.kafka.demo.generic.Consumer"
# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistryGenericSpec"
- Serializing data efficiently with Apache Avro and dealing with a Schema Registry
- Kafka, Avro Serialization and the Schema Registry
- Kafka, Streams and Avro serialization
- Avro and the Schema Registry
- Producing and Consuming Avro Messages over Kafka in Scala
Alternatives
- schema-repo
- Hortonworks Registry
TODO
- generic + schema evolution
- ovotech
- multi-schema
- formulation
Description
Kafka Streams apis examples
Demo-1
# access kafka
docker exec -it local-kafka bash
# create topic
# example [example.to-upper-case-app.input|example.to-upper-case-app.output]
kafka-topics --zookeeper zookeeper:2181 \
--create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>
# ToUpperCaseApp example (input topic required)
sbt "streams/runMain com.kafka.demo.streams.ToUpperCaseApp"
# produce
kafka-console-producer --broker-list kafka:9092 \
--topic example.to-upper-case-app.input
# consume
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic example.to-upper-case-app.output
# test
sbt clean streams/test
Demo-2
Tested with embedded-kafka and embedded-kafka-schema-registry
# access kafka
docker exec -it local-kafka bash
# create topic
# example [json.streams-json-to-avro-app.input|avro.streams-json-to-avro-app.output]
kafka-topics --zookeeper zookeeper:2181 \
--create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>
# produce (default StringSerializer)
kafka-console-producer \
--broker-list kafka:9092 \
--property "parse.key=true" \
--property "key.separator=:" \
--property "key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
--property "value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
--topic <TOPIC_NAME>
# consume (default StringDeserializer)
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--from-beginning \
--property "print.key=true" \
--property "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
--property "value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
--topic <TOPIC_NAME>
# access schema-registry
docker exec -it local-schema-registry bash
# consume avro
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
--property schema.registry.url=http://schema-registry:8081 \
--property schema.id.separator=: \
--property print.key=true \
--property print.schema.ids=true \
--property key.separator=, \
--from-beginning \
--topic <TOPIC_NAME>
# JsonToAvroApp example (input topic required)
sbt "streams-json-avro/runMain com.kafka.demo.JsonToAvroApp"
# test
sbt clean streams-json-avro/test
Example
# json
mykey:{"valueInt":42,"valueString":"foo"}
# log
[json.streams-json-to-avro-app.input]: mykey, JsonModel(42,foo)
[avro.streams-json-to-avro-app.output]: KeyAvroModel(mykey), ValueAvroModel(42,FOO)
TODO
CatsKafkaStreamsApp
[source]
# run app
sbt -jvm-debug 5005 "cats-kafka-streams/runMain com.kafka.demo.CatsKafkaStreamsApp"
TODO
ZioKafkaStreamsApp
[source]
# run app
sbt -jvm-debug 5005 "zio-kafka-streams/runMain com.kafka.demo.ZioKafkaStreamsApp"
- Introducing Kafka Streams: Stream Processing Made Simple
- Unifying Stream Processing and Interactive Queries in Apache Kafka
- Of Streams and Tables in Kafka and Stream Processing
- How to use Apache Kafka to transform a batch pipeline into a real-time one
- Functional Programming with Kafka Streams and Scala
- Enabling Exactly-Once in Kafka Streams
Description
Setup Kafka
# access kafka
docker exec -it local-kafka bash
# create topic
kafka-topics --zookeeper zookeeper:2181 \
--create --if-not-exists --replication-factor 1 --partitions 1 --topic USER_PROFILE
# produce sample data
kafka-console-producer --broker-list kafka:9092 --topic USER_PROFILE << EOF
{"userid": 1000, "firstname": "Alison", "lastname": "Smith", "countrycode": "GB", "rating": 4.7}
EOF
# consume
kafka-console-consumer --bootstrap-server kafka:9092 --topic USER_PROFILE --from-beginning
Access KSQL CLI
-
using the server
# access ksql-server docker exec -it local-ksql-server bash # start ksql cli ksql http://ksql-server:8088
-
using a local instance
# connect to local cli docker exec -it local-ksql-cli ksql http://ksql-server:8088
-
using a temporary instance
# connect to remote server docker run --rm \ --network=kafka-scala-examples_local_kafka_network \ -it confluentinc/cp-ksql-cli http://ksql-server:8088
Execute SQL statements
# create stream
CREATE STREAM user_profile (\
userid INT, \
firstname VARCHAR, \
lastname VARCHAR, \
countrycode VARCHAR, \
rating DOUBLE \
) WITH (KAFKA_TOPIC = 'USER_PROFILE', VALUE_FORMAT = 'JSON');
# verify stream
list streams;
describe user_profile;
# query stream
SELECT userid, firstname, lastname, countrycode, rating FROM user_profile EMIT CHANGES;
Expect the consumer and the query to show the generated data
# generate data
docker run --rm \
-v $(pwd)/local/ksql:/datagen \
--network=kafka-scala-examples_local_kafka_network \
-it confluentinc/ksql-examples ksql-datagen \
bootstrap-server=kafka:29092 \
schemaRegistryUrl=http://schema-registry:8081 \
schema=datagen/user_profile.avro \
format=json \
topic=USER_PROFILE \
key=userid \
maxInterval=5000 \
iterations=100
- Kafka Connect
- Confluent's Kafka Connect API and connectors
- Udemy Course
- Kafka Connect Fundamentals
Setup PostgreSQL locally
# create shared network
docker-compose up
# start postgres
docker-compose -f docker-compose.postgres.yml up
# (mac|linux) view postgres ui
# [schema=public|database=postgres|username=postgres|password=postgres]
[open|xdg-open] http://localhost:8080
Setup connectors
# list connector
http -v :8083/connectors
# init data to generate schema
cp local/connect/data/resources-0.txt.orig local/connect/data/resources-0.txt
# setup spooldir source connector
http -v --json POST :8083/connectors < local/connect/config/source-spooldir-connector.json
# ingest data
echo "{\"accountId\":\"123\",\"resourceType\":\"XXX\",\"value\":\"X1\"}" > local/connect/data/resources-1.txt
# setup jdbc sink connector
# topic = SCHEMA.DATABASE = "public.postgres"
http -v --json POST :8083/connectors < local/connect/config/sink-jdbc-connector.json
# verify data
docker exec -it local-postgres bash -c "psql -U postgres postgres"
select * from public.postgres;
# cleanup
docker-compose -f docker-compose.postgres.yml down -v
- Old presentation
- What is the actual role of Zookeeper in Kafka?
- How to use Apache Kafka to transform a batch pipeline into a real-time one
- Kafka Partitioning
- Should you put several event types in the same Kafka topic?
- How to choose the number of topics/partitions in a Kafka cluster?
- Docker Tips and Tricks with KSQL and Kafka
- Introduction to Topic Log Compaction in Apache Kafka