diff --git a/statefun-docs/docs/io_module/index.rst b/statefun-docs/docs/io_module/index.rst index 0b8680821..ec2f2999c 100644 --- a/statefun-docs/docs/io_module/index.rst +++ b/statefun-docs/docs/io_module/index.rst @@ -32,7 +32,7 @@ Based on the concept of Ingress (input) and Egress (output) points, and built on .. _ingress: Ingress -^^^^^^^^ +======= An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions. It is defined via an ``IngressIdentifier`` and an ``IngressSpec``. @@ -71,7 +71,7 @@ The spec defines the details of how to connect to the external system, which is spec: # ingress specific configurations Router -"""""" +^^^^^^ A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions. Routers are bound to the system via a stateful function module, and unlike other components, an ingress may have any number of routers. @@ -101,7 +101,7 @@ Routers are bound to the system via a stateful function module, and unlike other .. _egress: Egress -^^^^^^ +====== Egress is the opposite of ingress; it is a point that takes messages and writes them to external systems. Each egress is defined using two components, an ``EgressIdentifier`` and an ``EgressSpec``. diff --git a/statefun-docs/docs/io_module/kafka.rst b/statefun-docs/docs/io_module/kafka.rst index 1f4639dd4..30190043c 100644 --- a/statefun-docs/docs/io_module/kafka.rst +++ b/statefun-docs/docs/io_module/kafka.rst @@ -19,13 +19,14 @@ Apache Kafka Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics. It is based on Apache Flink's universal `Kafka connector `_ and provides exactly-once processing semantics. +The Kafka I/O Module is configurable in Yaml or Java. .. contents:: :local: Dependency -=========== +========== -To use the Kafka I/O Module, please include the following dependency in your pom. +To use the Kafka I/O Module in Java, please include the following dependency in your pom. .. code-block:: xml @@ -36,10 +37,10 @@ To use the Kafka I/O Module, please include the following dependency in your pom provided -Kafka Ingress Builder -===================== +Kafka Ingress Spec +================== -A ``KafkaIngressBuilder`` declares an ingress spec for consuming from Kafka cluster. +A ``KafkaIngressSpec`` declares an ingress spec for consuming from Kafka cluster. It accepts the following arguments: @@ -47,33 +48,167 @@ It accepts the following arguments: 2) The topic name / list of topic names 3) The address of the bootstrap servers 4) The consumer group id to use -5) A ``KafkaIngressDeserializer`` for deserializing data from Kafka +5) A ``KafkaIngressDeserializer`` for deserializing data from Kafka (Java only) 6) The position to start consuming from -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java - :language: java - :lines: 18- +.. tabs:: -The ingress allows configuring the startup position to be one of the following: + .. group-tab:: Java -* ``KafkaIngressStartupPosition#fromGroupOffsets()`` (default): starts from offsets that were committed to Kafka for the specified consumer group. -* ``KafkaIngressStartupPosition#fromEarliest()``: starts from the earliest offset. -* ``KafkaIngressStartupPosition#fromLatest()``: starts from the latest offset. -* ``KafkaIngressStartupPosition#fromSpecificOffsets(Map)``: starts from specific offsets, defined as a map of partitions to their target starting offset. -* ``KafkaIngressStartupPosition#fromDate(Date)``: starts from offsets that have an ingestion time larger than or equal to a specified date. + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java + :language: java + :lines: 18- -On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to -start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured -using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``. By default, this is set to be the latest position. + .. group-tab:: Yaml + + .. code-block:: yaml + + version: "1.0" + + module: + meta: + type: remote + spec: + ingresses: + - ingress: + meta: + type: statefun.kafka.io/routable-protobuf-ingress + id: example/user-ingress + spec: + address: kafka-broker:9092 + consumerGroupId: routable-kafka-e2e + startupPosition: + type: earliest + topics: + - topic: messages-1 + typeUrl: com.googleapis/com.company.MessageWithAddress + targets: + - example-namespace/my-function-1 + - example-namespace/my-function-2 The ingress also accepts properties to directly configure the Kafka client, using ``KafkaIngressBuilder#withProperties(Properties)``. Please refer to the Kafka `consumer configuration `_ documentation for the full list of available properties. Note that configuration passed using named methods, such as ``KafkaIngressBuilder#withConsumerGroupId(String)``, will have higher precedence and overwrite their respective settings in the provided properties. +Startup Position +^^^^^^^^^^^^^^^^ + +The ingress allows configuring the startup position to be one of the following: + +**From Group Offset (default)** + +Starts from offsets that were committed to Kafka for the specified consumer group. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaIngressStartupPosition#fromGroupOffsets(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: group-offsets + +**Earliest** + +Starts from the earliest offset. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaIngressStartupPosition#fromEarliest(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: earliest + +**Latest** + +Starts from the latest offset. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaIngressStartupPosition#fromLatest(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: latest + +**Specific Offsets** + +Starts from specific offsets, defined as a map of partitions to their target starting offset. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + Map offsets = new HashMap<>(); + offsets.add(new TopicPartition("user-topic", 0), 91); + offsets.add(new TopicPartition("user-topic", 11), 11); + offsets.add(new TopicPartition("user-topic", 8), 8); + + KafkaIngressStartupPosition#fromSpecificOffsets(offsets); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: specific-offsets + offsets: + - user-topic/0: 91 + - user-topic/1: 11 + - user-topic/2: 8 + +**Date** + +Starts from offsets that have an ingestion time larger than or equal to a specified date. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaIngressStartupPosition#fromDate(ZonedDateTime.now()); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: date + date: 2020-02-01 04:15:00.00 Z + +On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to +start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured +using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``. +By default, this is set to be the latest position. + Kafka Deserializer -"""""""""""""""""" +^^^^^^^^^^^^^^^^^^ -The Kafka ingress needs to know how to turn the binary data in Kafka into Java objects. +When using the Java api, the Kafka ingress needs to know how to turn the binary data in Kafka into Java objects. The ``KafkaIngressDeserializer`` allows users to specify such a schema. The ``T deserialize(ConsumerRecord record)`` method gets called for each Kafka message, passing the key, value, and metadata from Kafka. @@ -90,30 +225,111 @@ It accepts the following arguments: 1) The egress identifier associated with this egress 2) The address of the bootstrap servers -3) A ``KafkaEgressSerializer`` for serializing data into Kafka +3) A ``KafkaEgressSerializer`` for serializing data into Kafka (Java only) 4) The fault tolerance semantic 5) Properties for the Kafka producer -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/EgressSpecs.java - :language: java - :lines: 18- +.. tabs:: + + .. group-tab:: Java + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/EgressSpecs.java + :language: java + :lines: 18- + + .. group-tab:: Yaml + + .. code-block:: yaml + + version: "1.0" + + module: + meta: + type: remote + spec: + egresses: + - egress: + meta: + type: statefun.kafka.io/generic-egress + id: example/output-messages + spec: + address: kafka-broker:9092 + deliverySemantic: + type: exactly-once + transactionTimeoutMillis: 100000 + properties: + - foo.config: bar Please refer to the Kafka `producer configuration `_ documentation for the full list of available properties. Kafka Egress and Fault Tolerance -"""""""""""""""""""""""""""""""" +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees. -You can choose three different modes of operating based through the ``KafkaEgressBuilder``. +You can choose three different modes of operation. + +**None** + +Nothing is guaranteed, produced records can be lost or duplicated. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaEgressBuilder#withNoProducerSemantics(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + deliverySemantic: + type: none + +**At Least Once** + +Stateful Functions will guarantee that no records will be lost but they can be duplicated. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaEgressBuilder#withAtLeastOnceProducerSemantics(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + deliverySemantic: + type: at-least-once + +**Exactly Once** + +Stateful Functions uses Kafka transactions to provide exactly-once semantics. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaEgressBuilder#withExactlyOnceProducerSemantics(Duration.minutes(15)); + + .. group-tab:: Yaml + + .. code-block:: yaml -* ``KafkaEgressBuilder#withNoProducerSemantics``: Nothing is guaranteed. Produced records can be lost or duplicated. -* ``KafkaEgressBuilder#withAtLeastOnceProducerSemantics``: Stateful Functions will guarantee that nor records will be lost but they can be duplicated. -* ``KafkaEgressBuilder#withExactlyOnceProducerSemantics``: Stateful Functions uses Kafka transactions to provide exactly-once semantics. + deliverySemantic: + type: exactly-once + transactionTimeoutMillis: 900000 # 15 min Kafka Serializer -"""""""""""""""" +^^^^^^^^^^^^^^^^ -The Kafka egress needs to know how to turn Java objects into binary data. +When using the Java api, the Kafka egress needs to know how to turn Java objects into binary data. The ``KafkaEgressSerializer`` allows users to specify such a schema. The ``ProducerRecord serialize(T out)`` method gets called for each message, allowing users to set a key, value, and other metadata.