Skip to content

Commit

Permalink
[FLINK-16557][docs] Document YAML-ized Kafka egresses / ingresses in …
Browse files Browse the repository at this point in the history
…Stateful Functions documentation

This closes #63.
  • Loading branch information
sjwiesman authored and tzulitai committed Mar 22, 2020
1 parent 7fcdae2 commit b3a9ea1
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 34 deletions.
6 changes: 3 additions & 3 deletions statefun-docs/docs/io_module/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Based on the concept of Ingress (input) and Egress (output) points, and built on
.. _ingress:

Ingress
^^^^^^^^
=======

An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions.
It is defined via an ``IngressIdentifier`` and an ``IngressSpec``.
Expand Down Expand Up @@ -71,7 +71,7 @@ The spec defines the details of how to connect to the external system, which is
spec: # ingress specific configurations
Router
""""""
^^^^^^

A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions.
Routers are bound to the system via a stateful function module, and unlike other components, an ingress may have any number of routers.
Expand Down Expand Up @@ -101,7 +101,7 @@ Routers are bound to the system via a stateful function module, and unlike other
.. _egress:

Egress
^^^^^^
======

Egress is the opposite of ingress; it is a point that takes messages and writes them to external systems.
Each egress is defined using two components, an ``EgressIdentifier`` and an ``EgressSpec``.
Expand Down
278 changes: 247 additions & 31 deletions statefun-docs/docs/io_module/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ Apache Kafka

Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics.
It is based on Apache Flink's universal `Kafka connector <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html>`_ and provides exactly-once processing semantics.
The Kafka I/O Module is configurable in Yaml or Java.

.. contents:: :local:

Dependency
===========
==========

To use the Kafka I/O Module, please include the following dependency in your pom.
To use the Kafka I/O Module in Java, please include the following dependency in your pom.

.. code-block:: xml
Expand All @@ -36,44 +37,178 @@ To use the Kafka I/O Module, please include the following dependency in your pom
<scope>provided</scope>
</dependency>
Kafka Ingress Builder
=====================
Kafka Ingress Spec
==================

A ``KafkaIngressBuilder`` declares an ingress spec for consuming from Kafka cluster.
A ``KafkaIngressSpec`` declares an ingress spec for consuming from Kafka cluster.

It accepts the following arguments:

1) The ingress identifier associated with this ingress
2) The topic name / list of topic names
3) The address of the bootstrap servers
4) The consumer group id to use
5) A ``KafkaIngressDeserializer`` for deserializing data from Kafka
5) A ``KafkaIngressDeserializer`` for deserializing data from Kafka (Java only)
6) The position to start consuming from

.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java
:language: java
:lines: 18-
.. tabs::

The ingress allows configuring the startup position to be one of the following:
.. group-tab:: Java

* ``KafkaIngressStartupPosition#fromGroupOffsets()`` (default): starts from offsets that were committed to Kafka for the specified consumer group.
* ``KafkaIngressStartupPosition#fromEarliest()``: starts from the earliest offset.
* ``KafkaIngressStartupPosition#fromLatest()``: starts from the latest offset.
* ``KafkaIngressStartupPosition#fromSpecificOffsets(Map)``: starts from specific offsets, defined as a map of partitions to their target starting offset.
* ``KafkaIngressStartupPosition#fromDate(Date)``: starts from offsets that have an ingestion time larger than or equal to a specified date.
.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java
:language: java
:lines: 18-

On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to
start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured
using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``. By default, this is set to be the latest position.
.. group-tab:: Yaml

.. code-block:: yaml
version: "1.0"
module:
meta:
type: remote
spec:
ingresses:
- ingress:
meta:
type: statefun.kafka.io/routable-protobuf-ingress
id: example/user-ingress
spec:
address: kafka-broker:9092
consumerGroupId: routable-kafka-e2e
startupPosition:
type: earliest
topics:
- topic: messages-1
typeUrl: com.googleapis/com.company.MessageWithAddress
targets:
- example-namespace/my-function-1
- example-namespace/my-function-2
The ingress also accepts properties to directly configure the Kafka client, using ``KafkaIngressBuilder#withProperties(Properties)``.
Please refer to the Kafka `consumer configuration <https://docs.confluent.io/current/installation/configuration/consumer-configs.html>`_ documentation for the full list of available properties.
Note that configuration passed using named methods, such as ``KafkaIngressBuilder#withConsumerGroupId(String)``, will have higher precedence and overwrite their respective settings in the provided properties.

Startup Position
^^^^^^^^^^^^^^^^

The ingress allows configuring the startup position to be one of the following:

**From Group Offset (default)**

Starts from offsets that were committed to Kafka for the specified consumer group.

.. tabs::

.. group-tab:: Java

.. code-block:: none
KafkaIngressStartupPosition#fromGroupOffsets();
.. group-tab:: Yaml

.. code-block:: yaml
startupPosition:
type: group-offsets
**Earliest**

Starts from the earliest offset.

.. tabs::

.. group-tab:: Java

.. code-block:: none
KafkaIngressStartupPosition#fromEarliest();
.. group-tab:: Yaml

.. code-block:: yaml
startupPosition:
type: earliest
**Latest**

Starts from the latest offset.

.. tabs::

.. group-tab:: Java

.. code-block:: none
KafkaIngressStartupPosition#fromLatest();
.. group-tab:: Yaml

.. code-block:: yaml
startupPosition:
type: latest
**Specific Offsets**

Starts from specific offsets, defined as a map of partitions to their target starting offset.

.. tabs::

.. group-tab:: Java

.. code-block:: none
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.add(new TopicPartition("user-topic", 0), 91);
offsets.add(new TopicPartition("user-topic", 11), 11);
offsets.add(new TopicPartition("user-topic", 8), 8);
KafkaIngressStartupPosition#fromSpecificOffsets(offsets);
.. group-tab:: Yaml

.. code-block:: yaml
startupPosition:
type: specific-offsets
offsets:
- user-topic/0: 91
- user-topic/1: 11
- user-topic/2: 8
**Date**

Starts from offsets that have an ingestion time larger than or equal to a specified date.

.. tabs::

.. group-tab:: Java

.. code-block:: none
KafkaIngressStartupPosition#fromDate(ZonedDateTime.now());
.. group-tab:: Yaml

.. code-block:: yaml
startupPosition:
type: date
date: 2020-02-01 04:15:00.00 Z
On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to
start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured
using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``.
By default, this is set to be the latest position.

Kafka Deserializer
""""""""""""""""""
^^^^^^^^^^^^^^^^^^

The Kafka ingress needs to know how to turn the binary data in Kafka into Java objects.
When using the Java api, the Kafka ingress needs to know how to turn the binary data in Kafka into Java objects.
The ``KafkaIngressDeserializer`` allows users to specify such a schema.
The ``T deserialize(ConsumerRecord<byte[], byte[]> record)`` method gets called for each Kafka message, passing the key, value, and metadata from Kafka.

Expand All @@ -90,30 +225,111 @@ It accepts the following arguments:

1) The egress identifier associated with this egress
2) The address of the bootstrap servers
3) A ``KafkaEgressSerializer`` for serializing data into Kafka
3) A ``KafkaEgressSerializer`` for serializing data into Kafka (Java only)
4) The fault tolerance semantic
5) Properties for the Kafka producer

.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/EgressSpecs.java
:language: java
:lines: 18-
.. tabs::

.. group-tab:: Java

.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/EgressSpecs.java
:language: java
:lines: 18-

.. group-tab:: Yaml

.. code-block:: yaml
version: "1.0"
module:
meta:
type: remote
spec:
egresses:
- egress:
meta:
type: statefun.kafka.io/generic-egress
id: example/output-messages
spec:
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 100000
properties:
- foo.config: bar
Please refer to the Kafka `producer configuration <https://docs.confluent.io/current/installation/configuration/producer-configs.html>`_ documentation for the full list of available properties.

Kafka Egress and Fault Tolerance
""""""""""""""""""""""""""""""""
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees.
You can choose three different modes of operating based through the ``KafkaEgressBuilder``.
You can choose three different modes of operation.

**None**

Nothing is guaranteed, produced records can be lost or duplicated.

.. tabs::

.. group-tab:: Java

.. code-block:: none
KafkaEgressBuilder#withNoProducerSemantics();
.. group-tab:: Yaml

.. code-block:: yaml
deliverySemantic:
type: none
**At Least Once**

Stateful Functions will guarantee that no records will be lost but they can be duplicated.

.. tabs::

.. group-tab:: Java

.. code-block:: none
KafkaEgressBuilder#withAtLeastOnceProducerSemantics();
.. group-tab:: Yaml

.. code-block:: yaml
deliverySemantic:
type: at-least-once
**Exactly Once**

Stateful Functions uses Kafka transactions to provide exactly-once semantics.

.. tabs::

.. group-tab:: Java

.. code-block:: none
KafkaEgressBuilder#withExactlyOnceProducerSemantics(Duration.minutes(15));
.. group-tab:: Yaml

.. code-block:: yaml
* ``KafkaEgressBuilder#withNoProducerSemantics``: Nothing is guaranteed. Produced records can be lost or duplicated.
* ``KafkaEgressBuilder#withAtLeastOnceProducerSemantics``: Stateful Functions will guarantee that nor records will be lost but they can be duplicated.
* ``KafkaEgressBuilder#withExactlyOnceProducerSemantics``: Stateful Functions uses Kafka transactions to provide exactly-once semantics.
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 900000 # 15 min
Kafka Serializer
""""""""""""""""
^^^^^^^^^^^^^^^^

The Kafka egress needs to know how to turn Java objects into binary data.
When using the Java api, the Kafka egress needs to know how to turn Java objects into binary data.
The ``KafkaEgressSerializer`` allows users to specify such a schema.
The ``ProducerRecord<byte[], byte[]> serialize(T out)`` method gets called for each message, allowing users to set a key, value, and other metadata.

Expand Down

0 comments on commit b3a9ea1

Please sign in to comment.