diff --git a/docs/transactions.md b/docs/transactions.md new file mode 100644 index 00000000..e69de29b diff --git a/examples/transactions/README.md b/examples/transactions/README.md new file mode 100644 index 00000000..dec6e93f --- /dev/null +++ b/examples/transactions/README.md @@ -0,0 +1,69 @@ +# Kstreams Transaction example + +This example shows how to use `kafka transactions` with `kstreams`. For this purpose we have to setup a local kafka with 3 `brokers` and `kafka-ui` to explore +the `topics offsets` and `groups lag`. + +## Requirements + +`python 3.8+`, `poetry`, `docker-compose` + +### Usage + +First run the local cluster: + +```bash +./scripts/cluster/start +``` + +Second, you need to install the project dependencies dependencies. In a different terminal execute: + +```bash +poetry install +``` + +Then we can run the project + +```bash +poetry run app +``` + +You should see something similar to the following logs: + +```bash +kstreams/examples/transactions via 🐳 colima is 📦 v0.1.0 via 🐍 v3.12.4 +❯ poetry run app + +INFO:ssl_example.app:Starting application... +INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--kstreams'}) +INFO:aiokafka.consumer.consumer:Subscribed to topic(s): {'local--kstreams'} +INFO:kstreams.prometheus.monitor:Starting Prometheus Monitoring started... +INFO:ssl_example.app:Producing event 0 +INFO:ssl_example.app:Producing event 1 +INFO:ssl_example.app:Producing event 2 +INFO:ssl_example.app:Producing event 3 +INFO:ssl_example.app:Producing event 4 +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +ERROR:aiokafka.consumer.group_coordinator:Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1 for group example-group +INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group example-group +INFO:aiokafka.consumer.group_coordinator:(Re-)joining group example-group +INFO:aiokafka.consumer.group_coordinator:Joined group 'example-group' (generation 1) with member_id aiokafka-0.11.0-5fb10c73-64b2-42a8-ae8a-23f59d4a3b6b +INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin +INFO:aiokafka.consumer.group_coordinator:Successfully synced group example-group with generation 1 +INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--kstreams', partition=0)} for group example-group +``` + +## Note + +If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where +`kstreams` is pointing to the parent folder. You will have to set the latest version. diff --git a/examples/transactions/docker-compose.yaml b/examples/transactions/docker-compose.yaml new file mode 100644 index 00000000..cd502512 --- /dev/null +++ b/examples/transactions/docker-compose.yaml @@ -0,0 +1,104 @@ +version: '3' +services: + zookeeper: + image: "confluentinc/cp-zookeeper:7.7.0" + hostname: zookeeper + container_name: kstream_zookeeper + ports: + - 32181:32181 + networks: + - kafka + environment: + - ZOOKEEPER_CLIENT_PORT=32181 + broker-1: + image: confluentinc/cp-kafka:7.7.0 + hostname: broker-1 + container_name: broker-1 + ports: + - 9091:9091 + depends_on: + - zookeeper + networks: + - kafka + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9091,INTERNAL://broker-1:9010 + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_BROKER_ID=1 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 + - CONFLUENT_METRICS_ENABLE=true + - CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous + - KAFKA_AUTO_CREATE_TOPICS_ENABL="true" + - KAFKA_JMX_PORT=19101 + broker-2: + image: confluentinc/cp-kafka:7.7.0 + hostname: broker-2 + container_name: broker-2 + ports: + - 9092:9092 + depends_on: + - zookeeper + networks: + - kafka + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9092,INTERNAL://broker-2:9010 + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_BROKER_ID=2 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 + - CONFLUENT_METRICS_ENABLE=true + - CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous + - KAFKA_AUTO_CREATE_TOPICS_ENABL="true" + - KAFKA_JMX_PORT=19102 + broker-3: + image: confluentinc/cp-kafka:7.7.0 + hostname: broker-3 + container_name: broker-3 + ports: + - 9093:9093 + depends_on: + - zookeeper + networks: + - kafka + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9093,INTERNAL://broker-3:9010 + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_BROKER_ID=3 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 + - CONFLUENT_METRICS_ENABLE=true + - CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous + - KAFKA_AUTO_CREATE_TOPICS_ENABL="true" + - KAFKA_JMX_PORT=19103 + kafka-ui: + container_name: kafka-ui + image: kafbat/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - broker-1 + - broker-2 + - broker-3 + environment: + KAFKA_CLUSTERS_0_NAME: broker-1 + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker-1:9010 + KAFKA_CLUSTERS_0_METRICS_PORT: 19101 + KAFKA_CLUSTERS_1_NAME: broker-2 + KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: broker-2:9010 + KAFKA_CLUSTERS_1_METRICS_PORT: 19102 + KAFKA_CLUSTERS_2_NAME: broker-3 + KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: broker-3:9010 + KAFKA_CLUSTERS_2_METRICS_PORT: 19103 + DYNAMIC_CONFIG_ENABLED: 'true' + networks: + - kafka + +networks: + kafka: + name: kafka diff --git a/examples/transactions/poetry.lock b/examples/transactions/poetry.lock new file mode 100644 index 00000000..38cc6c52 --- /dev/null +++ b/examples/transactions/poetry.lock @@ -0,0 +1,355 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "aiokafka" +version = "0.12.0" +description = "Kafka integration with asyncio" +optional = false +python-versions = ">=3.9" +files = [ + {file = "aiokafka-0.12.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:da8938eac2153ca767ac0144283b3df7e74bb4c0abc0c9a722f3ae63cfbf3a42"}, + {file = "aiokafka-0.12.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:a5c827c8883cfe64bc49100de82862225714e1853432df69aba99f135969bb1b"}, + {file = "aiokafka-0.12.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bea5710f7707ed12a7f8661ab38dfa80f5253a405de5ba228f457cc30404eb51"}, + {file = "aiokafka-0.12.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d87b1a45c57bbb1c17d1900a74739eada27e4f4a0b0932ab3c5a8cbae8bbfe1e"}, + {file = "aiokafka-0.12.0-cp310-cp310-win32.whl", hash = "sha256:1158e630664d9abc74d8a7673bc70dc10737ff758e1457bebc1c05890f29ce2c"}, + {file = "aiokafka-0.12.0-cp310-cp310-win_amd64.whl", hash = "sha256:06f5889acf8e1a81d6e14adf035acb29afd1f5836447fa8fa23d3cbe8f7e8608"}, + {file = "aiokafka-0.12.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ddc5308c43d48af883667e2f950a0a9739ce2c9bfe69a0b55dc234f58b1b42d6"}, + {file = "aiokafka-0.12.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ff63689cafcd6dd642a15de75b7ae121071d6162cccba16d091bcb28b3886307"}, + {file = "aiokafka-0.12.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:24633931e05a9dc80555a2f845572b6845d2dcb1af12de27837b8602b1b8bc74"}, + {file = "aiokafka-0.12.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:42b2436c7c69384d210e9169fbfe339d9f49dbdcfddd8d51c79b9877de545e33"}, + {file = "aiokafka-0.12.0-cp311-cp311-win32.whl", hash = "sha256:90511a2c4cf5f343fc2190575041fbc70171654ab0dae64b3bbabd012613bfa7"}, + {file = "aiokafka-0.12.0-cp311-cp311-win_amd64.whl", hash = "sha256:04c8ad27d04d6c53a1859687015a5f4e58b1eb221e8a7342d6c6b04430def53e"}, + {file = "aiokafka-0.12.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b01947553ff1120fa1cb1a05f2c3e5aa47a5378c720bafd09e6630ba18af02aa"}, + {file = "aiokafka-0.12.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:e3c8ec1c0606fa645462c7353dc3e4119cade20c4656efa2031682ffaad361c0"}, + {file = "aiokafka-0.12.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:577c1c48b240e9eba57b3d2d806fb3d023a575334fc3953f063179170cc8964f"}, + {file = "aiokafka-0.12.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7b815b2e5fed9912f1231be6196547a367b9eb3380b487ff5942f0c73a3fb5c"}, + {file = "aiokafka-0.12.0-cp312-cp312-win32.whl", hash = "sha256:5a907abcdf02430df0829ac80f25b8bb849630300fa01365c76e0ae49306f512"}, + {file = "aiokafka-0.12.0-cp312-cp312-win_amd64.whl", hash = "sha256:fdbd69ec70eea4a8dfaa5c35ff4852e90e1277fcc426b9380f0b499b77f13b16"}, + {file = "aiokafka-0.12.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f9e8ab97b935ca681a5f28cf22cf2b5112be86728876b3ec07e4ed5fc6c21f2d"}, + {file = "aiokafka-0.12.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:ed991c120fe19fd9439f564201dd746c4839700ef270dd4c3ee6d4895f64fe83"}, + {file = "aiokafka-0.12.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2c01abf9787b1c3f3af779ad8e76d5b74903f590593bc26f33ed48750503e7f7"}, + {file = "aiokafka-0.12.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:08c84b3894d97fd02fcc8886f394000d0f5ce771fab5c498ea2b0dd2f6b46d5b"}, + {file = "aiokafka-0.12.0-cp313-cp313-win32.whl", hash = "sha256:63875fed922c8c7cf470d9b2a82e1b76b4a1baf2ae62e07486cf516fd09ff8f2"}, + {file = "aiokafka-0.12.0-cp313-cp313-win_amd64.whl", hash = "sha256:bdc0a83eb386d2384325d6571f8ef65b4cfa205f8d1c16d7863e8d10cacd995a"}, + {file = "aiokafka-0.12.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a9590554fae68ec80099beae5366f2494130535a1a3db0c4fa5ccb08f37f6e46"}, + {file = "aiokafka-0.12.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6c77f5953ff4b25c889aef26df1f28df66c58db7abb7f34ecbe48502e9a6d273"}, + {file = "aiokafka-0.12.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f96d7fd8fdb5f439f7e7860fd8ec37870265d0578475e82049bce60ab07ca045"}, + {file = "aiokafka-0.12.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b8ddff02b1e981083dff6d1a80d4502e0e83e0e480faf1f881766ca6f23e8d22"}, + {file = "aiokafka-0.12.0-cp39-cp39-win32.whl", hash = "sha256:4aab2767dcc8923626d8d60c314f9ba633563249cff71750db5d70b6ec813da2"}, + {file = "aiokafka-0.12.0-cp39-cp39-win_amd64.whl", hash = "sha256:7a57fda053acd1b88c87803ad0381a1d2a29d36ec561550d11ce9154972b8e23"}, + {file = "aiokafka-0.12.0.tar.gz", hash = "sha256:62423895b866f95b5ed8d88335295a37cc5403af64cb7cb0e234f88adc2dff94"}, +] + +[package.dependencies] +async-timeout = "*" +packaging = "*" +typing-extensions = ">=4.10.0" + +[package.extras] +all = ["cramjam (>=2.8.0)", "gssapi"] +gssapi = ["gssapi"] +lz4 = ["cramjam (>=2.8.0)"] +snappy = ["cramjam"] +zstd = ["cramjam"] + +[[package]] +name = "aiorun" +version = "2025.1.1" +description = "Boilerplate for asyncio applications" +optional = false +python-versions = ">=3.7" +files = [ + {file = "aiorun-2025.1.1-py3-none-any.whl", hash = "sha256:46d6fa7ac4bfe93ff8385fa17941e4dbe0452d0353497196be25b000571fe3e1"}, + {file = "aiorun-2025.1.1.tar.gz", hash = "sha256:86d1075a034ce2671ab532db06e9204fe784cdd0c66ca7b8cc47a7527d0d50a3"}, +] + +[package.extras] +dev = ["pytest", "pytest-cov"] + +[[package]] +name = "annotated-types" +version = "0.7.0" +description = "Reusable constraint types to use with typing.Annotated" +optional = false +python-versions = ">=3.8" +files = [ + {file = "annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53"}, + {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, +] + +[[package]] +name = "async-timeout" +version = "5.0.1" +description = "Timeout context manager for asyncio programs" +optional = false +python-versions = ">=3.8" +files = [ + {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, + {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, +] + +[[package]] +name = "future" +version = "1.0.0" +description = "Clean single-source support for Python 3 and 2" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +files = [ + {file = "future-1.0.0-py3-none-any.whl", hash = "sha256:929292d34f5872e70396626ef385ec22355a1fae8ad29e1a734c3e43f9fbc216"}, + {file = "future-1.0.0.tar.gz", hash = "sha256:bd2968309307861edae1458a4f8a4f3598c03be43b97521076aebf5d94c07b05"}, +] + +[[package]] +name = "kstreams" +version = "0.26.6" +description = "Build simple kafka streams applications" +optional = false +python-versions = "^3.9" +files = [] +develop = true + +[package.dependencies] +aiokafka = "<1.0" +future = "^1.0.0" +prometheus-client = "<1.0" +pydantic = ">=2.0.0,<3.0.0" +PyYAML = ">=5.4,<7.0.0" + +[package.source] +type = "directory" +url = "../.." + +[[package]] +name = "packaging" +version = "24.2" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, + {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, +] + +[[package]] +name = "prometheus-client" +version = "0.21.1" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.21.1-py3-none-any.whl", hash = "sha256:594b45c410d6f4f8888940fe80b5cc2521b305a1fafe1c58609ef715a001f301"}, + {file = "prometheus_client-0.21.1.tar.gz", hash = "sha256:252505a722ac04b0456be05c05f75f45d760c2911ffc45f2a06bcaed9f3ae3fb"}, +] + +[package.extras] +twisted = ["twisted"] + +[[package]] +name = "pydantic" +version = "2.10.6" +description = "Data validation using Python type hints" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pydantic-2.10.6-py3-none-any.whl", hash = "sha256:427d664bf0b8a2b34ff5dd0f5a18df00591adcee7198fbd71981054cef37b584"}, + {file = "pydantic-2.10.6.tar.gz", hash = "sha256:ca5daa827cce33de7a42be142548b0096bf05a7e7b365aebfa5f8eeec7128236"}, +] + +[package.dependencies] +annotated-types = ">=0.6.0" +pydantic-core = "2.27.2" +typing-extensions = ">=4.12.2" + +[package.extras] +email = ["email-validator (>=2.0.0)"] +timezone = ["tzdata"] + +[[package]] +name = "pydantic-core" +version = "2.27.2" +description = "Core functionality for Pydantic validation and serialization" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pydantic_core-2.27.2-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:2d367ca20b2f14095a8f4fa1210f5a7b78b8a20009ecced6b12818f455b1e9fa"}, + {file = "pydantic_core-2.27.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:491a2b73db93fab69731eaee494f320faa4e093dbed776be1a829c2eb222c34c"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7969e133a6f183be60e9f6f56bfae753585680f3b7307a8e555a948d443cc05a"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3de9961f2a346257caf0aa508a4da705467f53778e9ef6fe744c038119737ef5"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e2bb4d3e5873c37bb3dd58714d4cd0b0e6238cebc4177ac8fe878f8b3aa8e74c"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:280d219beebb0752699480fe8f1dc61ab6615c2046d76b7ab7ee38858de0a4e7"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:47956ae78b6422cbd46f772f1746799cbb862de838fd8d1fbd34a82e05b0983a"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:14d4a5c49d2f009d62a2a7140d3064f686d17a5d1a268bc641954ba181880236"}, + {file = "pydantic_core-2.27.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:337b443af21d488716f8d0b6164de833e788aa6bd7e3a39c005febc1284f4962"}, + {file = "pydantic_core-2.27.2-cp310-cp310-musllinux_1_1_armv7l.whl", hash = "sha256:03d0f86ea3184a12f41a2d23f7ccb79cdb5a18e06993f8a45baa8dfec746f0e9"}, + {file = "pydantic_core-2.27.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:7041c36f5680c6e0f08d922aed302e98b3745d97fe1589db0a3eebf6624523af"}, + {file = "pydantic_core-2.27.2-cp310-cp310-win32.whl", hash = "sha256:50a68f3e3819077be2c98110c1f9dcb3817e93f267ba80a2c05bb4f8799e2ff4"}, + {file = "pydantic_core-2.27.2-cp310-cp310-win_amd64.whl", hash = "sha256:e0fd26b16394ead34a424eecf8a31a1f5137094cabe84a1bcb10fa6ba39d3d31"}, + {file = "pydantic_core-2.27.2-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:8e10c99ef58cfdf2a66fc15d66b16c4a04f62bca39db589ae8cba08bc55331bc"}, + {file = "pydantic_core-2.27.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:26f32e0adf166a84d0cb63be85c562ca8a6fa8de28e5f0d92250c6b7e9e2aff7"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8c19d1ea0673cd13cc2f872f6c9ab42acc4e4f492a7ca9d3795ce2b112dd7e15"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:5e68c4446fe0810e959cdff46ab0a41ce2f2c86d227d96dc3847af0ba7def306"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d9640b0059ff4f14d1f37321b94061c6db164fbe49b334b31643e0528d100d99"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:40d02e7d45c9f8af700f3452f329ead92da4c5f4317ca9b896de7ce7199ea459"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1c1fd185014191700554795c99b347d64f2bb637966c4cfc16998a0ca700d048"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:d81d2068e1c1228a565af076598f9e7451712700b673de8f502f0334f281387d"}, + {file = "pydantic_core-2.27.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:1a4207639fb02ec2dbb76227d7c751a20b1a6b4bc52850568e52260cae64ca3b"}, + {file = "pydantic_core-2.27.2-cp311-cp311-musllinux_1_1_armv7l.whl", hash = "sha256:3de3ce3c9ddc8bbd88f6e0e304dea0e66d843ec9de1b0042b0911c1663ffd474"}, + {file = "pydantic_core-2.27.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:30c5f68ded0c36466acede341551106821043e9afaad516adfb6e8fa80a4e6a6"}, + {file = "pydantic_core-2.27.2-cp311-cp311-win32.whl", hash = "sha256:c70c26d2c99f78b125a3459f8afe1aed4d9687c24fd677c6a4436bc042e50d6c"}, + {file = "pydantic_core-2.27.2-cp311-cp311-win_amd64.whl", hash = "sha256:08e125dbdc505fa69ca7d9c499639ab6407cfa909214d500897d02afb816e7cc"}, + {file = "pydantic_core-2.27.2-cp311-cp311-win_arm64.whl", hash = "sha256:26f0d68d4b235a2bae0c3fc585c585b4ecc51382db0e3ba402a22cbc440915e4"}, + {file = "pydantic_core-2.27.2-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:9e0c8cfefa0ef83b4da9588448b6d8d2a2bf1a53c3f1ae5fca39eb3061e2f0b0"}, + {file = "pydantic_core-2.27.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:83097677b8e3bd7eaa6775720ec8e0405f1575015a463285a92bfdfe254529ef"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:172fce187655fece0c90d90a678424b013f8fbb0ca8b036ac266749c09438cb7"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:519f29f5213271eeeeb3093f662ba2fd512b91c5f188f3bb7b27bc5973816934"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:05e3a55d124407fffba0dd6b0c0cd056d10e983ceb4e5dbd10dda135c31071d6"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9c3ed807c7b91de05e63930188f19e921d1fe90de6b4f5cd43ee7fcc3525cb8c"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6fb4aadc0b9a0c063206846d603b92030eb6f03069151a625667f982887153e2"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:28ccb213807e037460326424ceb8b5245acb88f32f3d2777427476e1b32c48c4"}, + {file = "pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:de3cd1899e2c279b140adde9357c4495ed9d47131b4a4eaff9052f23398076b3"}, + {file = "pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_armv7l.whl", hash = "sha256:220f892729375e2d736b97d0e51466252ad84c51857d4d15f5e9692f9ef12be4"}, + {file = "pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:a0fcd29cd6b4e74fe8ddd2c90330fd8edf2e30cb52acda47f06dd615ae72da57"}, + {file = "pydantic_core-2.27.2-cp312-cp312-win32.whl", hash = "sha256:1e2cb691ed9834cd6a8be61228471d0a503731abfb42f82458ff27be7b2186fc"}, + {file = "pydantic_core-2.27.2-cp312-cp312-win_amd64.whl", hash = "sha256:cc3f1a99a4f4f9dd1de4fe0312c114e740b5ddead65bb4102884b384c15d8bc9"}, + {file = "pydantic_core-2.27.2-cp312-cp312-win_arm64.whl", hash = "sha256:3911ac9284cd8a1792d3cb26a2da18f3ca26c6908cc434a18f730dc0db7bfa3b"}, + {file = "pydantic_core-2.27.2-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:7d14bd329640e63852364c306f4d23eb744e0f8193148d4044dd3dacdaacbd8b"}, + {file = "pydantic_core-2.27.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:82f91663004eb8ed30ff478d77c4d1179b3563df6cdb15c0817cd1cdaf34d154"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:71b24c7d61131bb83df10cc7e687433609963a944ccf45190cfc21e0887b08c9"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:fa8e459d4954f608fa26116118bb67f56b93b209c39b008277ace29937453dc9"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:ce8918cbebc8da707ba805b7fd0b382816858728ae7fe19a942080c24e5b7cd1"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:eda3f5c2a021bbc5d976107bb302e0131351c2ba54343f8a496dc8783d3d3a6a"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd8086fa684c4775c27f03f062cbb9eaa6e17f064307e86b21b9e0abc9c0f02e"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:8d9b3388db186ba0c099a6d20f0604a44eabdeef1777ddd94786cdae158729e4"}, + {file = "pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:7a66efda2387de898c8f38c0cf7f14fca0b51a8ef0b24bfea5849f1b3c95af27"}, + {file = "pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_armv7l.whl", hash = "sha256:18a101c168e4e092ab40dbc2503bdc0f62010e95d292b27827871dc85450d7ee"}, + {file = "pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:ba5dd002f88b78a4215ed2f8ddbdf85e8513382820ba15ad5ad8955ce0ca19a1"}, + {file = "pydantic_core-2.27.2-cp313-cp313-win32.whl", hash = "sha256:1ebaf1d0481914d004a573394f4be3a7616334be70261007e47c2a6fe7e50130"}, + {file = "pydantic_core-2.27.2-cp313-cp313-win_amd64.whl", hash = "sha256:953101387ecf2f5652883208769a79e48db18c6df442568a0b5ccd8c2723abee"}, + {file = "pydantic_core-2.27.2-cp313-cp313-win_arm64.whl", hash = "sha256:ac4dbfd1691affb8f48c2c13241a2e3b60ff23247cbcf981759c768b6633cf8b"}, + {file = "pydantic_core-2.27.2-cp38-cp38-macosx_10_12_x86_64.whl", hash = "sha256:d3e8d504bdd3f10835468f29008d72fc8359d95c9c415ce6e767203db6127506"}, + {file = "pydantic_core-2.27.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:521eb9b7f036c9b6187f0b47318ab0d7ca14bd87f776240b90b21c1f4f149320"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:85210c4d99a0114f5a9481b44560d7d1e35e32cc5634c656bc48e590b669b145"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:d716e2e30c6f140d7560ef1538953a5cd1a87264c737643d481f2779fc247fe1"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f66d89ba397d92f840f8654756196d93804278457b5fbede59598a1f9f90b228"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:669e193c1c576a58f132e3158f9dfa9662969edb1a250c54d8fa52590045f046"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9fdbe7629b996647b99c01b37f11170a57ae675375b14b8c13b8518b8320ced5"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:d262606bf386a5ba0b0af3b97f37c83d7011439e3dc1a9298f21efb292e42f1a"}, + {file = "pydantic_core-2.27.2-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:cabb9bcb7e0d97f74df8646f34fc76fbf793b7f6dc2438517d7a9e50eee4f14d"}, + {file = "pydantic_core-2.27.2-cp38-cp38-musllinux_1_1_armv7l.whl", hash = "sha256:d2d63f1215638d28221f664596b1ccb3944f6e25dd18cd3b86b0a4c408d5ebb9"}, + {file = "pydantic_core-2.27.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:bca101c00bff0adb45a833f8451b9105d9df18accb8743b08107d7ada14bd7da"}, + {file = "pydantic_core-2.27.2-cp38-cp38-win32.whl", hash = "sha256:f6f8e111843bbb0dee4cb6594cdc73e79b3329b526037ec242a3e49012495b3b"}, + {file = "pydantic_core-2.27.2-cp38-cp38-win_amd64.whl", hash = "sha256:fd1aea04935a508f62e0d0ef1f5ae968774a32afc306fb8545e06f5ff5cdf3ad"}, + {file = "pydantic_core-2.27.2-cp39-cp39-macosx_10_12_x86_64.whl", hash = "sha256:c10eb4f1659290b523af58fa7cffb452a61ad6ae5613404519aee4bfbf1df993"}, + {file = "pydantic_core-2.27.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ef592d4bad47296fb11f96cd7dc898b92e795032b4894dfb4076cfccd43a9308"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c61709a844acc6bf0b7dce7daae75195a10aac96a596ea1b776996414791ede4"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:42c5f762659e47fdb7b16956c71598292f60a03aa92f8b6351504359dbdba6cf"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:4c9775e339e42e79ec99c441d9730fccf07414af63eac2f0e48e08fd38a64d76"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:57762139821c31847cfb2df63c12f725788bd9f04bc2fb392790959b8f70f118"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0d1e85068e818c73e048fe28cfc769040bb1f475524f4745a5dc621f75ac7630"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:097830ed52fd9e427942ff3b9bc17fab52913b2f50f2880dc4a5611446606a54"}, + {file = "pydantic_core-2.27.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:044a50963a614ecfae59bb1eaf7ea7efc4bc62f49ed594e18fa1e5d953c40e9f"}, + {file = "pydantic_core-2.27.2-cp39-cp39-musllinux_1_1_armv7l.whl", hash = "sha256:4e0b4220ba5b40d727c7f879eac379b822eee5d8fff418e9d3381ee45b3b0362"}, + {file = "pydantic_core-2.27.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5e4f4bb20d75e9325cc9696c6802657b58bc1dbbe3022f32cc2b2b632c3fbb96"}, + {file = "pydantic_core-2.27.2-cp39-cp39-win32.whl", hash = "sha256:cca63613e90d001b9f2f9a9ceb276c308bfa2a43fafb75c8031c4f66039e8c6e"}, + {file = "pydantic_core-2.27.2-cp39-cp39-win_amd64.whl", hash = "sha256:77d1bca19b0f7021b3a982e6f903dcd5b2b06076def36a652e3907f596e29f67"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:2bf14caea37e91198329b828eae1618c068dfb8ef17bb33287a7ad4b61ac314e"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:b0cb791f5b45307caae8810c2023a184c74605ec3bcbb67d13846c28ff731ff8"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:688d3fd9fcb71f41c4c015c023d12a79d1c4c0732ec9eb35d96e3388a120dcf3"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d591580c34f4d731592f0e9fe40f9cc1b430d297eecc70b962e93c5c668f15f"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:82f986faf4e644ffc189a7f1aafc86e46ef70372bb153e7001e8afccc6e54133"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:bec317a27290e2537f922639cafd54990551725fc844249e64c523301d0822fc"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-musllinux_1_1_armv7l.whl", hash = "sha256:0296abcb83a797db256b773f45773da397da75a08f5fcaef41f2044adec05f50"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:0d75070718e369e452075a6017fbf187f788e17ed67a3abd47fa934d001863d9"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:7e17b560be3c98a8e3aa66ce828bdebb9e9ac6ad5466fba92eb74c4c95cb1151"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:c33939a82924da9ed65dab5a65d427205a73181d8098e79b6b426bdf8ad4e656"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:00bad2484fa6bda1e216e7345a798bd37c68fb2d97558edd584942aa41b7d278"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c817e2b40aba42bac6f457498dacabc568c3b7a986fc9ba7c8d9d260b71485fb"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:251136cdad0cb722e93732cb45ca5299fb56e1344a833640bf93b2803f8d1bfd"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:d2088237af596f0a524d3afc39ab3b036e8adb054ee57cbb1dcf8e09da5b29cc"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:d4041c0b966a84b4ae7a09832eb691a35aec90910cd2dbe7a208de59be77965b"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-musllinux_1_1_armv7l.whl", hash = "sha256:8083d4e875ebe0b864ffef72a4304827015cff328a1be6e22cc850753bfb122b"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:f141ee28a0ad2123b6611b6ceff018039df17f32ada8b534e6aa039545a3efb2"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:7d0c8399fcc1848491f00e0314bd59fb34a9c008761bcb422a057670c3f65e35"}, + {file = "pydantic_core-2.27.2.tar.gz", hash = "sha256:eb026e5a4c1fee05726072337ff51d1efb6f59090b7da90d30ea58625b1ffb39"}, +] + +[package.dependencies] +typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" + +[[package]] +name = "pyyaml" +version = "6.0.2" +description = "YAML parser and emitter for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "PyYAML-6.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0a9a2848a5b7feac301353437eb7d5957887edbf81d56e903999a75a3d743086"}, + {file = "PyYAML-6.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:29717114e51c84ddfba879543fb232a6ed60086602313ca38cce623c1d62cfbf"}, + {file = "PyYAML-6.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8824b5a04a04a047e72eea5cec3bc266db09e35de6bdfe34c9436ac5ee27d237"}, + {file = "PyYAML-6.0.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7c36280e6fb8385e520936c3cb3b8042851904eba0e58d277dca80a5cfed590b"}, + {file = "PyYAML-6.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ec031d5d2feb36d1d1a24380e4db6d43695f3748343d99434e6f5f9156aaa2ed"}, + {file = "PyYAML-6.0.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:936d68689298c36b53b29f23c6dbb74de12b4ac12ca6cfe0e047bedceea56180"}, + {file = "PyYAML-6.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:23502f431948090f597378482b4812b0caae32c22213aecf3b55325e049a6c68"}, + {file = "PyYAML-6.0.2-cp310-cp310-win32.whl", hash = "sha256:2e99c6826ffa974fe6e27cdb5ed0021786b03fc98e5ee3c5bfe1fd5015f42b99"}, + {file = "PyYAML-6.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:a4d3091415f010369ae4ed1fc6b79def9416358877534caf6a0fdd2146c87a3e"}, + {file = "PyYAML-6.0.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:cc1c1159b3d456576af7a3e4d1ba7e6924cb39de8f67111c735f6fc832082774"}, + {file = "PyYAML-6.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:1e2120ef853f59c7419231f3bf4e7021f1b936f6ebd222406c3b60212205d2ee"}, + {file = "PyYAML-6.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5d225db5a45f21e78dd9358e58a98702a0302f2659a3c6cd320564b75b86f47c"}, + {file = "PyYAML-6.0.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:5ac9328ec4831237bec75defaf839f7d4564be1e6b25ac710bd1a96321cc8317"}, + {file = "PyYAML-6.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3ad2a3decf9aaba3d29c8f537ac4b243e36bef957511b4766cb0057d32b0be85"}, + {file = "PyYAML-6.0.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:ff3824dc5261f50c9b0dfb3be22b4567a6f938ccce4587b38952d85fd9e9afe4"}, + {file = "PyYAML-6.0.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:797b4f722ffa07cc8d62053e4cff1486fa6dc094105d13fea7b1de7d8bf71c9e"}, + {file = "PyYAML-6.0.2-cp311-cp311-win32.whl", hash = "sha256:11d8f3dd2b9c1207dcaf2ee0bbbfd5991f571186ec9cc78427ba5bd32afae4b5"}, + {file = "PyYAML-6.0.2-cp311-cp311-win_amd64.whl", hash = "sha256:e10ce637b18caea04431ce14fabcf5c64a1c61ec9c56b071a4b7ca131ca52d44"}, + {file = "PyYAML-6.0.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:c70c95198c015b85feafc136515252a261a84561b7b1d51e3384e0655ddf25ab"}, + {file = "PyYAML-6.0.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ce826d6ef20b1bc864f0a68340c8b3287705cae2f8b4b1d932177dcc76721725"}, + {file = "PyYAML-6.0.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f71ea527786de97d1a0cc0eacd1defc0985dcf6b3f17bb77dcfc8c34bec4dc5"}, + {file = "PyYAML-6.0.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9b22676e8097e9e22e36d6b7bda33190d0d400f345f23d4065d48f4ca7ae0425"}, + {file = "PyYAML-6.0.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:80bab7bfc629882493af4aa31a4cfa43a4c57c83813253626916b8c7ada83476"}, + {file = "PyYAML-6.0.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:0833f8694549e586547b576dcfaba4a6b55b9e96098b36cdc7ebefe667dfed48"}, + {file = "PyYAML-6.0.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8b9c7197f7cb2738065c481a0461e50ad02f18c78cd75775628afb4d7137fb3b"}, + {file = "PyYAML-6.0.2-cp312-cp312-win32.whl", hash = "sha256:ef6107725bd54b262d6dedcc2af448a266975032bc85ef0172c5f059da6325b4"}, + {file = "PyYAML-6.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:7e7401d0de89a9a855c839bc697c079a4af81cf878373abd7dc625847d25cbd8"}, + {file = "PyYAML-6.0.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:efdca5630322a10774e8e98e1af481aad470dd62c3170801852d752aa7a783ba"}, + {file = "PyYAML-6.0.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:50187695423ffe49e2deacb8cd10510bc361faac997de9efef88badc3bb9e2d1"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0ffe8360bab4910ef1b9e87fb812d8bc0a308b0d0eef8c8f44e0254ab3b07133"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:17e311b6c678207928d649faa7cb0d7b4c26a0ba73d41e99c4fff6b6c3276484"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:70b189594dbe54f75ab3a1acec5f1e3faa7e8cf2f1e08d9b561cb41b845f69d5"}, + {file = "PyYAML-6.0.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:41e4e3953a79407c794916fa277a82531dd93aad34e29c2a514c2c0c5fe971cc"}, + {file = "PyYAML-6.0.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:68ccc6023a3400877818152ad9a1033e3db8625d899c72eacb5a668902e4d652"}, + {file = "PyYAML-6.0.2-cp313-cp313-win32.whl", hash = "sha256:bc2fa7c6b47d6bc618dd7fb02ef6fdedb1090ec036abab80d4681424b84c1183"}, + {file = "PyYAML-6.0.2-cp313-cp313-win_amd64.whl", hash = "sha256:8388ee1976c416731879ac16da0aff3f63b286ffdd57cdeb95f3f2e085687563"}, + {file = "PyYAML-6.0.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:24471b829b3bf607e04e88d79542a9d48bb037c2267d7927a874e6c205ca7e9a"}, + {file = "PyYAML-6.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7fded462629cfa4b685c5416b949ebad6cec74af5e2d42905d41e257e0869f5"}, + {file = "PyYAML-6.0.2-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d84a1718ee396f54f3a086ea0a66d8e552b2ab2017ef8b420e92edbc841c352d"}, + {file = "PyYAML-6.0.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9056c1ecd25795207ad294bcf39f2db3d845767be0ea6e6a34d856f006006083"}, + {file = "PyYAML-6.0.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:82d09873e40955485746739bcb8b4586983670466c23382c19cffecbf1fd8706"}, + {file = "PyYAML-6.0.2-cp38-cp38-win32.whl", hash = "sha256:43fa96a3ca0d6b1812e01ced1044a003533c47f6ee8aca31724f78e93ccc089a"}, + {file = "PyYAML-6.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:01179a4a8559ab5de078078f37e5c1a30d76bb88519906844fd7bdea1b7729ff"}, + {file = "PyYAML-6.0.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:688ba32a1cffef67fd2e9398a2efebaea461578b0923624778664cc1c914db5d"}, + {file = "PyYAML-6.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a8786accb172bd8afb8be14490a16625cbc387036876ab6ba70912730faf8e1f"}, + {file = "PyYAML-6.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d8e03406cac8513435335dbab54c0d385e4a49e4945d2909a581c83647ca0290"}, + {file = "PyYAML-6.0.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f753120cb8181e736c57ef7636e83f31b9c0d1722c516f7e86cf15b7aa57ff12"}, + {file = "PyYAML-6.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3b1fdb9dc17f5a7677423d508ab4f243a726dea51fa5e70992e59a7411c89d19"}, + {file = "PyYAML-6.0.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:0b69e4ce7a131fe56b7e4d770c67429700908fc0752af059838b1cfb41960e4e"}, + {file = "PyYAML-6.0.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:a9f8c2e67970f13b16084e04f134610fd1d374bf477b17ec1599185cf611d725"}, + {file = "PyYAML-6.0.2-cp39-cp39-win32.whl", hash = "sha256:6395c297d42274772abc367baaa79683958044e5d3835486c16da75d2a694631"}, + {file = "PyYAML-6.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:39693e1f8320ae4f43943590b49779ffb98acb81f788220ea932a6b6c51004d8"}, + {file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"}, +] + +[[package]] +name = "typing-extensions" +version = "4.12.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +files = [ + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, +] + +[metadata] +lock-version = "2.0" +python-versions = "^3.12" +content-hash = "4d7d42d972410e2a090c46ac43f94a0728854addba96f2ad3808cad8ac1b5f72" diff --git a/examples/transactions/pyproject.toml b/examples/transactions/pyproject.toml new file mode 100644 index 00000000..a7d38d57 --- /dev/null +++ b/examples/transactions/pyproject.toml @@ -0,0 +1,19 @@ +[tool.poetry] +name = "transactions" +version = "0.1.0" +description = "" +authors = ["marcosschroh "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.12" +kstreams = { path = "../../.", develop = true } +aiorun = "^2025.1.1" + + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +app = "transactions.app:main" diff --git a/examples/transactions/scripts/cluster/start b/examples/transactions/scripts/cluster/start new file mode 100755 index 00000000..41c582ed --- /dev/null +++ b/examples/transactions/scripts/cluster/start @@ -0,0 +1,3 @@ +#!/bin/sh -e + +docker-compose up diff --git a/examples/transactions/scripts/cluster/stop b/examples/transactions/scripts/cluster/stop new file mode 100755 index 00000000..e2613397 --- /dev/null +++ b/examples/transactions/scripts/cluster/stop @@ -0,0 +1,4 @@ +#!/bin/sh -e + +docker-compose stop +docker-compose rm --force -v diff --git a/examples/transactions/tests/__init__.py b/examples/transactions/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/transactions/transactions/__init__.py b/examples/transactions/transactions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/transactions/transactions/app.py b/examples/transactions/transactions/app.py new file mode 100644 index 00000000..919c8b5f --- /dev/null +++ b/examples/transactions/transactions/app.py @@ -0,0 +1,39 @@ +import asyncio +import logging + +import aiorun + +from .engine import stream_engine +from .producer import produce +from .streams import consume_from_transaction, consume_json + +logger = logging.getLogger(__name__) + + +async def start(): + stream_engine.add_stream(consume_from_transaction) + stream_engine.add_stream(consume_json) + + await stream_engine.start() + await produce( + topic="local--kstreams-json", + data={"message": "Hello world!"}, + send=stream_engine.send, + ) + + +async def shutdown(loop: asyncio.AbstractEventLoop): + await stream_engine.stop() + + +def main(): + logging.basicConfig( + format="%(asctime)s %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + level=logging.INFO, + ) + aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown) + + +if __name__ == "__main__": + start() diff --git a/examples/transactions/transactions/engine.py b/examples/transactions/transactions/engine.py new file mode 100644 index 00000000..6c4a7c5e --- /dev/null +++ b/examples/transactions/transactions/engine.py @@ -0,0 +1,12 @@ +from kstreams import create_engine +from kstreams.backends import Kafka + +from .serializers import JsonSerializer + +stream_engine = create_engine( + title="transaction-engine", + serializer=JsonSerializer(), + backend=Kafka( + bootstrap_servers=["localhost:9091", "localhost:9092", "localhost:9093"], + ), +) diff --git a/examples/transactions/transactions/producer.py b/examples/transactions/transactions/producer.py new file mode 100644 index 00000000..e5f548c8 --- /dev/null +++ b/examples/transactions/transactions/producer.py @@ -0,0 +1,19 @@ +import asyncio +import logging + +from kstreams import Send, consts + +logger = logging.getLogger(__name__) + + +async def produce(*, topic: str, data: dict, send: Send, total_events: int = 3) -> None: + for _ in range(total_events): + await asyncio.sleep(3) + await send( + topic, + value=data, + headers={ + "content-type": consts.APPLICATION_JSON, + }, + ) + logger.info(f"Message sent to topic {topic} \n\n") diff --git a/examples/transactions/transactions/serializers.py b/examples/transactions/transactions/serializers.py new file mode 100644 index 00000000..dad2bbd1 --- /dev/null +++ b/examples/transactions/transactions/serializers.py @@ -0,0 +1,36 @@ +import json +import logging +from typing import Any, Dict, Optional + +from kstreams import ConsumerRecord, middleware +from kstreams.types import Headers + +logger = logging.getLogger(__name__) + + +json_data = {"message": "Hello world!"} +raw_data = b"Hello world!" +raw_topic = "local--kstreams" +json_topic = "local--kstreams-json" + + +class JsonSerializer: + async def serialize( + self, + payload: Any, + headers: Optional[Headers] = None, + serializer_kwargs: Optional[Dict] = None, + ) -> bytes: + """ + Serialize a payload to json + """ + value = json.dumps(payload) + return value.encode() + + +class JsonDeserializerMiddleware(middleware.BaseMiddleware): + async def __call__(self, cr: ConsumerRecord): + if cr.value is not None: + data = json.loads(cr.value.decode()) + cr.value = data + return await self.next_call(cr) diff --git a/examples/transactions/transactions/streams.py b/examples/transactions/transactions/streams.py new file mode 100644 index 00000000..04e53b41 --- /dev/null +++ b/examples/transactions/transactions/streams.py @@ -0,0 +1,62 @@ +import logging +import uuid + +from kstreams import ( + ConsumerRecord, + Stream, + TopicPartition, + middleware, + stream, +) +from kstreams.types import Send, Transactional + +from .serializers import JsonDeserializerMiddleware + +logger = logging.getLogger(__name__) + + +json_data = {"message": "Hello world!"} +raw_data = b"Hello world!" +transactional_topic = "local--kstreams-transactional" +json_topic = "local--kstreams-json" + + +@stream( + transactional_topic, + group_id="my-group-raw-data", + enable_auto_commit=False, + isolation_level="read_committed", # <-- This will filter aborted txn's +) +async def consume_from_transaction(cr: ConsumerRecord, stream: Stream): + logger.info( + f"Event consumed from topic {transactional_topic} with value: {cr.value} \n\n" + ) + await stream.commit() + + +@stream( + json_topic, + group_id="my-group-json-data", + enable_auto_commit=False, + # isolation_level="read_committed", # <-- This will filter aborted txn's + middlewares=[middleware.Middleware(JsonDeserializerMiddleware)], +) +async def consume_json(cr: ConsumerRecord, send: Send, transactional: Transactional): + logger.info(f"Json Event consumed with offset: {cr.offset}, value: {cr.value}\n\n") + transaction_id = "my-transaction-id-" + str(uuid.uuid4()) + async with transactional(transaction_id=transaction_id) as t: + # send raw data to show that it is possible to send data without serialization + metadata = await t.send( + transactional_topic, + value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), + serializer=None, + ) + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit(offsets={tp: cr.offset + 1}, group_id="my-group-json-data") + # raise ValueError("This is a test error") + logger.info(f"Message sent inside transaction with metadata: {metadata}") + + # The same can be achied using the STREAM ENGINE di + # async with stream_engine.transaction(transaction_id=transaction_id) as t: + # ... diff --git a/kstreams/__init__.py b/kstreams/__init__.py index 54e82471..922d5fd5 100644 --- a/kstreams/__init__.py +++ b/kstreams/__init__.py @@ -11,6 +11,7 @@ from .streams import Stream, stream from .structs import TopicPartitionOffset from .test_utils import TestStreamClient +from .transaction import Transaction from .types import ConsumerRecord, Send __all__ = [ @@ -31,4 +32,5 @@ "TestStreamClient", "TopicPartition", "TopicPartitionOffset", + "Transaction", ] diff --git a/kstreams/engine.py b/kstreams/engine.py index 1ba85d20..774cfab3 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -18,7 +18,13 @@ from .serializers import NO_DEFAULT, Deserializer, Serializer from .streams import Stream, StreamFunc from .streams import stream as stream_func -from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall +from .transaction import Transaction, TransactionManager +from .types import ( + Deprecated, + EngineHooks, + Headers, + NextMiddlewareCall, +) from .utils import encode_headers, execute_hooks logger = logging.getLogger(__name__) @@ -81,8 +87,11 @@ def __init__( self.deserializer = deserializer self.serializer = serializer self.monitor = monitor - self._producer: typing.Optional[typing.Type[Producer]] = None + self._producer: typing.Optional[Producer] = None self._streams: typing.List[Stream] = [] + self._transaction_manager = TransactionManager( + producer_class=self.producer_class, backend=self.backend, send=self.send + ) self._on_startup = [] if on_startup is None else list(on_startup) self._on_stop = [] if on_stop is None else list(on_stop) self._after_startup = [] if after_startup is None else list(after_startup) @@ -98,6 +107,7 @@ async def send( headers: typing.Optional[Headers] = None, serializer: typing.Optional[Serializer] = NO_DEFAULT, serializer_kwargs: typing.Optional[typing.Dict] = None, + producer: typing.Optional[Producer] = None, ): """ Attributes: @@ -111,7 +121,9 @@ async def send( encode the event serializer_kwargs Dict[str, Any] | None: Serializer kwargs """ - if self._producer is None: + producer = producer or self._producer + + if producer is None: raise EngineNotStartedException() if serializer is NO_DEFAULT: @@ -127,7 +139,7 @@ async def send( if headers is not None: encoded_headers = encode_headers(headers) - fut = await self._producer.send( + fut = await producer.send( topic, value=value, key=key, @@ -142,6 +154,38 @@ async def send( return metadata + def transaction( + self, + transaction_id: typing.Optional[str] = None, + ) -> Transaction: + """ + Provides a context manager to send messages transactionally. + It creates a transactional producer, starts a transaction and + commits or aborts the transaction based on the context manager + + Attributes: + transaction_id str | None: The transaction unique identifier. + if None, a new transaction id will be generated + """ + # try: + return self._transaction_manager.get_or_create_transaction( + transaction_id=transaction_id + ) + # await transaction.producer.start() + # await transaction.producer.begin_transaction() + # yield transaction + # except Exception as exc: + # print(f"Error in transaction: {exc}") + # logger.error(f"Error in transaction: {exc}") + # if transaction.producer._txn_manager.is_fatal_error(): + # return + # await transaction.producer.abort_transaction() + # logger.error(f"Transaction with id {transaction_id} has been aborted") + # else: + # await transaction.producer.commit_transaction() + # finally: + # await transaction.producer.stop() + async def start(self) -> None: # Execute on_startup hooks await execute_hooks(self._on_startup) @@ -394,6 +438,7 @@ def add_stream( next_call=stream.func, send=self.send, stream=stream, + transactional=self.transaction, ) # NOTE: When `no typing` support is deprecated this check can @@ -408,7 +453,11 @@ def _build_stream_middleware_stack(self, *, stream: Stream) -> NextMiddlewareCal next_call = stream.udf_handler for middleware, options in reversed(middlewares): next_call = middleware( - next_call=next_call, send=self.send, stream=stream, **options + next_call=next_call, + send=self.send, + stream=stream, + transactional=self.transaction, + **options, ) return next_call diff --git a/kstreams/middleware/middleware.py b/kstreams/middleware/middleware.py index 8338a09c..fa25ee3c 100644 --- a/kstreams/middleware/middleware.py +++ b/kstreams/middleware/middleware.py @@ -17,6 +17,7 @@ class MiddlewareProtocol(typing.Protocol): next_call: types.NextMiddlewareCall send: types.Send stream: "Stream" + transactional: types.Transactional def __init__( self, @@ -24,6 +25,7 @@ def __init__( next_call: types.NextMiddlewareCall, send: types.Send, stream: "Stream", + transactional: types.Transactional, **kwargs: typing.Any, ) -> None: ... # pragma: no cover @@ -52,6 +54,7 @@ class BaseMiddleware: next_call: types.NextMiddlewareCall send: types.Send stream: "Stream" + transactional: types.Transactional def __init__( self, @@ -59,10 +62,12 @@ def __init__( next_call: types.NextMiddlewareCall, send: types.Send, stream: "Stream", + transactional: types.Transactional, ) -> None: self.next_call = next_call self.send = send self.stream = stream + self.transactional = transactional async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: raise NotImplementedError diff --git a/kstreams/middleware/udf_middleware.py b/kstreams/middleware/udf_middleware.py index 12b48f4e..447dea25 100644 --- a/kstreams/middleware/udf_middleware.py +++ b/kstreams/middleware/udf_middleware.py @@ -39,6 +39,7 @@ def __init__(self, *args, **kwargs) -> None: types.ConsumerRecord: None, Stream: self.stream, types.Send: self.send, + types.Transactional: self.transactional, } def get_type(self) -> UDFType: diff --git a/kstreams/transaction.py b/kstreams/transaction.py new file mode 100644 index 00000000..ca78f54d --- /dev/null +++ b/kstreams/transaction.py @@ -0,0 +1,81 @@ +import logging +import typing +import uuid +from functools import partial + +from .backends.kafka import Kafka +from .clients import Producer +from .types import Send + +logger = logging.getLogger(__name__) + + +class Transaction: + def __init__(self, transaction_id: str, producer: Producer, send: Send) -> None: + self.transaction_id = transaction_id + self.producer = producer + # infect the unique producer per transaction_id + self.send = partial(send, producer=self.producer) + + async def commit(self, *, offsets: dict[str, int], group_id: str) -> None: + await self.producer.send_offsets_to_transaction(offsets, group_id) + logger.info(f"Commiting offsets {offsets} for group {group_id}") + + async def __aenter__(self): + await self.producer.start() + await self.producer.begin_transaction() + return self + + async def __aexit__(self, exc_type, exc, tb): + print(f"Error in transaction: {exc}") + if exc: + if self.producer._txn_manager.is_fatal_error(): + logger.error(f"Error in transaction: {exc}") + # TODO: check if it is possible to + return + await self.producer.abort_transaction() + logger.error(f"Transaction with id {self.transaction_id} aborted") + else: + await self.producer.commit_transaction() + logger.info(f"Transaction with id {self.transaction_id} committed") + + await self.producer.stop() + + +class TransactionManager: + def __init__( + self, producer_class: typing.Type[Producer], backend: Kafka, send: Send + ) -> None: + self.backend = backend + self.producer_class = producer_class + self.send = send + + # map active transaction ids to transaction objects + # TODO: check if a transaction can be reused + self._transactions: dict[str, Transaction] = {} + + def get_or_create_transaction( + self, transaction_id: typing.Optional[str] = None + ) -> Transaction: + # transaction_id must be unique and it can not be reused, otherwise + # it will raise an error aiokafka.errors.KafkaError: + # KafkaError: Unexpected error during batch delivery + transaction_id = transaction_id or str(uuid.uuid4()) + + if ( + transaction_id in self._transactions + ): # do .get() instead and fallback to function + return self._transactions[transaction_id] + + producer = self._create_producer(transaction_id=transaction_id) + transaction = Transaction( + transaction_id=transaction_id, producer=producer, send=self.send + ) + self._transactions[transaction_id] = transaction + return transaction + + def _create_producer(self, transaction_id: str, **kwargs) -> Producer: + config = {**self.backend.model_dump(), **kwargs} + config["transactional_id"] = transaction_id + producer = self.producer_class(**config) + return producer diff --git a/kstreams/types.py b/kstreams/types.py index 90107722..c0cc75ea 100644 --- a/kstreams/types.py +++ b/kstreams/types.py @@ -1,10 +1,14 @@ import typing +from contextlib import AbstractAsyncContextManager from dataclasses import dataclass from aiokafka.structs import RecordMetadata +from .clients import Producer + if typing.TYPE_CHECKING: from .serializers import Serializer # pragma: no cover + from .transaction import Transaction # pragma: no cover Headers = typing.Dict[str, str] EncodedHeaders = typing.Sequence[typing.Tuple[str, bytes]] @@ -23,9 +27,17 @@ def __call__( headers: typing.Optional[Headers] = None, serializer: typing.Optional["Serializer"] = None, serializer_kwargs: typing.Optional[typing.Dict] = None, + producer: typing.Optional[Producer] = None, ) -> typing.Awaitable[RecordMetadata]: ... +class Transactional(typing.Protocol): + def __call__( + self, + transaction_id: typing.Optional[str] = None, + ) -> AbstractAsyncContextManager["Transaction", None]: ... + + D = typing.TypeVar("D") Deprecated = typing.Annotated[D, "deprecated"] diff --git a/mkdocs.yml b/mkdocs.yml index 1f863e78..8a33e186 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -33,6 +33,7 @@ nav: - Metrics: 'metrics.md' - Monitoring: 'monitoring.md' - Serialization: 'serialization.md' + - Transactions: 'transactions.md' - Testing: 'test_client.md' - Middleware: 'middleware.md' - Utils: 'utils.md' diff --git a/tests/conftest.py b/tests/conftest.py index 39fcfc1c..482e3aac 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +import json from collections import namedtuple from dataclasses import field from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple @@ -7,7 +8,7 @@ import pytest_asyncio from pytest_httpserver import HTTPServer -from kstreams import clients, create_engine +from kstreams import ConsumerRecord, clients, create_engine, types from kstreams.utils import create_ssl_context_from_mem @@ -81,6 +82,27 @@ async def committed(self, _: TopicPartition): return 10 +class JsonSerializer: + async def serialize( + self, + payload: Any, + headers: Optional[types.Headers] = None, + serializer_kwargs: Optional[Dict] = None, + ) -> bytes: + """ + Serialize paylod to json + """ + value = json.dumps(payload) + return value.encode() + + +class JsonDeserializer: + async def deserialize(self, consumer_record: ConsumerRecord, **kwargs) -> Any: + if consumer_record.value is not None: + data = consumer_record.value.decode() + return json.loads(data) + + @pytest.fixture def record_metadata(): return RecordMetadata() diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 5b05a718..5e42f0e9 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -1,41 +1,20 @@ -import json -from typing import Any, Dict, Optional +from typing import Dict, Optional from unittest import mock import pytest -from kstreams import ConsumerRecord, StreamEngine, consts +from kstreams import StreamEngine, consts from kstreams.clients import Producer from kstreams.streams import Stream from kstreams.test_utils.test_utils import TestStreamClient -from kstreams.types import Headers from kstreams.utils import encode_headers - -class MyJsonSerializer: - async def serialize( - self, - payload: Any, - headers: Optional[Headers] = None, - serializer_kwargs: Optional[Dict] = None, - ) -> bytes: - """ - Serialize paylod to json - """ - value = json.dumps(payload) - return value.encode() - - -class MyJsonDeserializer: - async def deserialize(self, consumer_record: ConsumerRecord, **kwargs) -> Any: - if consumer_record.value is not None: - data = consumer_record.value.decode() - return json.loads(data) +from .conftest import JsonDeserializer, JsonSerializer @pytest.mark.asyncio async def test_send_global_serializer(stream_engine: StreamEngine, record_metadata): - serializer = MyJsonSerializer() + serializer = JsonSerializer() stream_engine.serializer = serializer async def async_func(): @@ -87,7 +66,7 @@ async def async_func(): topic, value=value, headers=headers, - serializer=MyJsonSerializer(), + serializer=JsonSerializer(), ) assert metadata @@ -104,7 +83,7 @@ async def async_func(): @pytest.mark.asyncio async def test_not_serialize_value(stream_engine: StreamEngine, record_metadata): # even if a serializer is set, we can send the value as is - stream_engine.serializer = MyJsonSerializer() + stream_engine.serializer = JsonSerializer() async def async_func(): return record_metadata @@ -153,7 +132,7 @@ async def test_consume_global_deserialization( Even though deserialzers are deprecated, we still support them. """ topic = "local--hello-kpn" - stream_engine.deserializer = MyJsonDeserializer() + stream_engine.deserializer = JsonDeserializer() client = TestStreamClient(stream_engine) save_to_db = mock.Mock() @@ -169,7 +148,7 @@ async def hello_stream(stream: Stream): value=value, headers=headers, key="1", - serializer=MyJsonSerializer(), + serializer=JsonSerializer(), ) # The payload as been encoded with json, @@ -197,7 +176,7 @@ async def test_consume_custom_deserialization( save_to_db = mock.Mock() - @stream_engine.stream(topic, deserializer=MyJsonDeserializer()) + @stream_engine.stream(topic, deserializer=JsonDeserializer()) async def hello_stream(stream: Stream): async for event in stream: save_to_db(event) @@ -209,7 +188,7 @@ async def hello_stream(stream: Stream): value=value, headers=headers, key="1", - serializer=MyJsonSerializer(), + serializer=JsonSerializer(), ) # The payload as been encoded with json, diff --git a/tests/test_transactions.py b/tests/test_transactions.py new file mode 100644 index 00000000..ef1812fd --- /dev/null +++ b/tests/test_transactions.py @@ -0,0 +1,342 @@ +import asyncio +import contextlib +import typing +from unittest import mock + +import pytest + +from kstreams import ( + Consumer, + ConsumerRecord, + Producer, + RecordMetadata, + StreamEngine, + TopicPartition, + types, +) +from tests import TimeoutErrorException + +from .conftest import JsonSerializer + + +@pytest.mark.parametrize("transaction_id", [None, "my-transaction-id"]) +@pytest.mark.asyncio +async def test_create_transaction_with_stream_engine( + stream_engine: StreamEngine, + transaction_id: typing.Optional[str], +): + with mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + ): + async with stream_engine.transaction(transaction_id=transaction_id) as t: + ... + + if transaction_id is None: + assert t.transaction_id is not None + assert t.producer._txn_manager.transactional_id is not None + else: + assert t.transaction_id == "my-transaction-id" + assert t.producer._txn_manager.transactional_id == "my-transaction-id" + + assert t.send is not None + + t.producer.start.assert_awaited() + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + + assert t.producer._closed + + +@pytest.mark.asyncio +async def test_create_transaction_with_stream( + stream_engine: StreamEngine, consumer_record_factory +): + value = b"test" + transaction_id = "my-transaction-id" + + async def getone(_): + return consumer_record_factory(value=value) + + with ( + mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + getone=getone, + ), + mock.patch.multiple( + Producer, + start=mock.DEFAULT, + stop=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + ), + ): + + @stream_engine.stream("local--kstreams") + async def stream( + cr: ConsumerRecord[str, bytes], transaction: types.Transactional + ): + async with transaction(transaction_id=transaction_id) as t: + assert cr.value == value + + assert t.transaction_id == transaction_id + assert t.producer._txn_manager.transactional_id == transaction_id + + t.producer.start.assert_awaited() + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.stop.assert_awaited() + + await asyncio.sleep(0.1) + + with contextlib.suppress(TimeoutErrorException): + await asyncio.wait_for(stream.start(), timeout=0.1) + + await stream.stop() + + +@pytest.mark.asyncio +async def test_send_event_in_transaction( + stream_engine: StreamEngine, record_metadata: RecordMetadata +): + async def async_func(): + return record_metadata + + with mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + ): + async with stream_engine.transaction() as t: + await t.send("sink-topic", value=b"1", key="1") + + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b"1", + key="1", + partition=None, + timestamp_ms=None, + headers=None, + ) + + +@pytest.mark.asyncio +async def test_send_event_with_global_serializer_in_transaction( + stream_engine: StreamEngine, record_metadata: RecordMetadata +): + stream_engine.serializer = JsonSerializer() + value = {"value": "Hello world!!"} + + async def async_func(): + return record_metadata + + with mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + ): + # Check that we send json data + async with stream_engine.transaction() as t: + await t.send("sink-topic", value=value, key="1") + + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b'{"value": "Hello world!!"}', + key="1", + partition=None, + timestamp_ms=None, + headers=None, + ) + + +@pytest.mark.asyncio +async def test_override_global_serializer_in_transaction( + stream_engine: StreamEngine, record_metadata: RecordMetadata +): + stream_engine.serializer = JsonSerializer() + + async def async_func(): + return record_metadata + + with mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + ): + # We have to set serializer=None otherwise we will get + # TypeError: Object of type bytes is not JSON serializable + async with stream_engine.transaction() as t: + await t.send( + "sink-topic", value=b"Helloooo", key="2", partition=10, serializer=None + ) + + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b"Helloooo", + key="2", + partition=10, + timestamp_ms=None, + headers=None, + ) + + +@pytest.mark.asyncio +async def test_commit_event_in_transaction( + stream_engine: StreamEngine, + record_metadata: RecordMetadata, + consumer_record_factory: typing.Callable[..., ConsumerRecord], +): + async def async_func(): + return record_metadata + + async def getone(_): + return consumer_record_factory(value=b"1") + + with ( + mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + send_offsets_to_transaction=mock.DEFAULT, + ), + mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + unsubscribe=mock.DEFAULT, + getone=getone, + ), + ): + + @stream_engine.stream("local--kstreams", group_id="test-group") + async def stream( + cr: ConsumerRecord[str, bytes], transaction: types.Transactional + ): + async with transaction() as t: + assert cr.value == b"1" + await t.send("sink-topic", value=b"1", key="1") + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit(offsets={tp: cr.offset + 1}, group_id="test-group") + + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b"1", + key="1", + partition=None, + timestamp_ms=None, + headers=None, + ) + t.producer.send_offsets_to_transaction.assert_awaited_once_with( + {tp: cr.offset + 1}, "test-group" + ) + + assert t.producer._closed + await asyncio.sleep(0.1) + + with contextlib.suppress(TimeoutErrorException): + await asyncio.wait_for(stream.start(), timeout=0.1) + + await stream.stop() + + +@pytest.mark.asyncio +async def test_abort_transaction( + stream_engine: StreamEngine, + record_metadata: RecordMetadata, + consumer_record_factory: typing.Callable[..., ConsumerRecord], +): + async def async_func(): + return record_metadata + + async def getone(_): + return consumer_record_factory(value=b"1") + + with ( + mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + abort_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + send_offsets_to_transaction=mock.DEFAULT, + ), + mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + unsubscribe=mock.DEFAULT, + getone=getone, + ), + pytest.raises(ValueError) as exc, + ): + + @stream_engine.stream("local--kstreams", group_id="test-group") + async def stream( + cr: ConsumerRecord[str, bytes], transaction: types.Transactional + ) -> typing.NoReturn: + async with transaction() as t: + assert cr.value == b"1" + await t.send("sink-topic", value=b"1", key="1") + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit(offsets={tp: cr.offset + 1}, group_id="test-group") + + # raise exception to abort the transaction + raise ValueError("This is a test error") + + t.producer.begin_transaction.assert_awaited() + + # commit_transaction should not be called + t.producer.commit_transaction.assert_not_awaited() + + # abort_transaction should be called + t.producer.abort_transaction.assert_awaited() + + # The event is always produced even if the transaction is aborted + # the consumer should filter the event using + # isolation_level="read_committed" + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b"1", + key="1", + partition=None, + timestamp_ms=None, + headers=None, + ) + + # The commit is always called even if the transaction is aborted + t.producer.send_offsets_to_transaction.assert_awaited_once_with( + {tp: cr.offset + 1}, "test-group" + ) + + assert t.producer._closed + await asyncio.sleep(0.1) + + with contextlib.suppress(TimeoutErrorException): + await asyncio.wait_for(stream.start(), timeout=0.1) + + await stream.stop() + + assert exc.type is ValueError