Skip to content

Commit

Permalink
feat(transactions): Transaction added to garantee only once paradigm.…
Browse files Browse the repository at this point in the history
… Related to #265
  • Loading branch information
marcosschroh committed Feb 14, 2025
1 parent d214a76 commit 4259bf1
Show file tree
Hide file tree
Showing 24 changed files with 1,253 additions and 37 deletions.
Empty file added docs/transactions.md
Empty file.
69 changes: 69 additions & 0 deletions examples/transactions/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Kstreams Transaction example

This example shows how to use `kafka transactions` with `kstreams`. For this purpose we have to setup a local kafka with 3 `brokers` and `kafka-ui` to explore
the `topics offsets` and `groups lag`.

## Requirements

`python 3.8+`, `poetry`, `docker-compose`

### Usage

First run the local cluster:

```bash
./scripts/cluster/start
```

Second, you need to install the project dependencies dependencies. In a different terminal execute:

```bash
poetry install
```

Then we can run the project

```bash
poetry run app
```

You should see something similar to the following logs:

```bash
kstreams/examples/transactions via 🐳 colima is 📦 v0.1.0 via 🐍 v3.12.4
❯ poetry run app

INFO:ssl_example.app:Starting application...
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--kstreams'})
INFO:aiokafka.consumer.consumer:Subscribed to topic(s): {'local--kstreams'}
INFO:kstreams.prometheus.monitor:Starting Prometheus Monitoring started...
INFO:ssl_example.app:Producing event 0
INFO:ssl_example.app:Producing event 1
INFO:ssl_example.app:Producing event 2
INFO:ssl_example.app:Producing event 3
INFO:ssl_example.app:Producing event 4
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1 for group example-group
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group example-group
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group example-group
INFO:aiokafka.consumer.group_coordinator:Joined group 'example-group' (generation 1) with member_id aiokafka-0.11.0-5fb10c73-64b2-42a8-ae8a-23f59d4a3b6b
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin
INFO:aiokafka.consumer.group_coordinator:Successfully synced group example-group with generation 1
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--kstreams', partition=0)} for group example-group
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
104 changes: 104 additions & 0 deletions examples/transactions/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
version: '3'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:7.7.0"
hostname: zookeeper
container_name: kstream_zookeeper
ports:
- 32181:32181
networks:
- kafka
environment:
- ZOOKEEPER_CLIENT_PORT=32181
broker-1:
image: confluentinc/cp-kafka:7.7.0
hostname: broker-1
container_name: broker-1
ports:
- 9091:9091
depends_on:
- zookeeper
networks:
- kafka
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9091,INTERNAL://broker-1:9010
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_BROKER_ID=1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- CONFLUENT_METRICS_ENABLE=true
- CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous
- KAFKA_AUTO_CREATE_TOPICS_ENABL="true"
- KAFKA_JMX_PORT=19101
broker-2:
image: confluentinc/cp-kafka:7.7.0
hostname: broker-2
container_name: broker-2
ports:
- 9092:9092
depends_on:
- zookeeper
networks:
- kafka
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9092,INTERNAL://broker-2:9010
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_BROKER_ID=2
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- CONFLUENT_METRICS_ENABLE=true
- CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous
- KAFKA_AUTO_CREATE_TOPICS_ENABL="true"
- KAFKA_JMX_PORT=19102
broker-3:
image: confluentinc/cp-kafka:7.7.0
hostname: broker-3
container_name: broker-3
ports:
- 9093:9093
depends_on:
- zookeeper
networks:
- kafka
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9093,INTERNAL://broker-3:9010
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_BROKER_ID=3
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- CONFLUENT_METRICS_ENABLE=true
- CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous
- KAFKA_AUTO_CREATE_TOPICS_ENABL="true"
- KAFKA_JMX_PORT=19103
kafka-ui:
container_name: kafka-ui
image: kafbat/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- broker-1
- broker-2
- broker-3
environment:
KAFKA_CLUSTERS_0_NAME: broker-1
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker-1:9010
KAFKA_CLUSTERS_0_METRICS_PORT: 19101
KAFKA_CLUSTERS_1_NAME: broker-2
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: broker-2:9010
KAFKA_CLUSTERS_1_METRICS_PORT: 19102
KAFKA_CLUSTERS_2_NAME: broker-3
KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: broker-3:9010
KAFKA_CLUSTERS_2_METRICS_PORT: 19103
DYNAMIC_CONFIG_ENABLED: 'true'
networks:
- kafka

networks:
kafka:
name: kafka
Loading

0 comments on commit 4259bf1

Please sign in to comment.