Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-16700][docs] Document Kinesis I/O Module #65

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions statefun-docs/docs/io_module/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ I/O Module
:hidden:

kafka
kinesis
source_sink
custom

Expand Down
374 changes: 374 additions & 0 deletions statefun-docs/docs/io_module/kinesis.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

###########
AWS Kinesis
###########

Stateful Functions offers an AWS Kinesis I/O Module for reading from and writing to Kinesis streams.
It is based on Apache Flink's `Kinesis connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kinesis.html>`_.
The Kinesis I/O Module is configurable in Yaml or Java.

.. contents:: :local:

Dependency
==========

To use the Kinesis I/O Module in Java, please include the following dependency in your pom.

.. code-block:: xml

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-kinesis-io</artifactId>
<version>{version}</version>
<scope>provided</scope>
</dependency>

Kinesis Ingress Spec
====================

A ``KinesisIngressSpec`` declares an ingress spec for consuming from Kinesis stream.

It accepts the following arguments:

1) The AWS region
2) An AWS credentials provider
3) A ``KinesisIngressDeserializer`` for deserializing data from Kinesis (Java only)
4) The stream start position
5) Properties for the Kinesis client
6) The name of the stream to consume from

.. tabs::

.. group-tab:: Java

.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java
:language: java
:lines: 18-

.. group-tab:: Yaml

.. code-block:: yaml

version: "1.0"

module:
meta:
type: remote
spec:
ingresses:
- ingress:
Comment on lines +72 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is off:

Suggested change
ingresses:
- ingress:
ingresses:
- ingress:

meta:
type: statefun.kinesis.io/routable-protobuf-ingress
id: example-namespace/messages
spec:
awsRegion:
type: specific
id: us-west-1
awsCredentials:
type: basic
accessKeyId: my_access_key_id
secretAccessKey: my_secret_access_key
startupPosition:
type: earliest
streams:
- stream: stream-1
typeUrl: com.googleapis/com.mycomp.foo.Message
targets:
Comment on lines +88 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- stream: stream-1
typeUrl: com.googleapis/com.mycomp.foo.Message
targets:
- stream: stream-1
typeUrl: com.googleapis/com.mycomp.foo.Message
targets:

- example-namespace/my-function-1
- example-namespace/my-function-2
- stream: stream-2
typeUrl: com.googleapis/com.mycomp.foo.Message
targets:
- example-namespace/my-function-1
clientConfigProperties:
- SocketTimeout: 9999
- MaxConnections: 15

The ingress also accepts properties to directly configure the Kinesis client, using ``KinesisIngressBuilder#withClientConfigurationProperty()``.
Please refer to the Kinesis `client configuration <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html>`_ documentation for the full list of available properties.
Note that configuration passed using named methods will have higher precedence and overwrite their respective settings in the provided properties.

Startup Position
^^^^^^^^^^^^^^^^

The ingress allows configuring the startup position to be one of the following:

**Latest (default)**

Start consuming from the latest position, i.e. head of the stream shards.

.. tabs::

.. group-tab:: Java

.. code-block:: none

KinesisIngressStartupPosition#fromLatest();

.. group-tab:: Yaml

.. code-block:: yaml

startupPosition:
type: latest

**Earliest**

Start consuming from the earliest position possible.

.. tabs::

.. group-tab:: Java

.. code-block:: none

KinesisIngressStartupPosition#fromEarliest();

.. group-tab:: Yaml

.. code-block:: yaml

startupPosition:
type: earliest


**Date**

Starts from offsets that have an ingestion time larger than or equal to a specified date.

.. tabs::

.. group-tab:: Java

.. code-block:: none

KinesisIngressStartupPosition#fromDate(ZonedDateTime.now())

.. group-tab:: Yaml

.. code-block:: yaml

startupPosition:
type: date
date: 2020-02-01 04:15:00.00 Z

Kinesis Deserializer
^^^^^^^^^^^^^^^^^^^^

The Kinesis ingress needs to know how to turn the binary data in Kinesis into Java objects.
The ``KinesisIngressDeserializer`` allows users to specify such a schema.
The ``T deserialize(IngressRecord ingressRecord)`` method gets called for each Kinesis record, passing the binary data and metadata from Kinesis.

.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java
:language: java
:lines: 18-

Kinesis Egress Spec
===================

A ``KinesisEgressBuilder`` declares an egress spec for writing data out to a Kinesis stream.

It accepts the following arguments:

1) The egress identifier associated with this egress
2) The AWS credentials provider
3) A ``KinesisEgressSerializer`` for serializing data into Kinesis (Java only)
4) The AWS region
5) Properties for the Kinesis client
6) The number of max outstanding records before backpressure is applied

.. tabs::

.. group-tab:: Java

.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java
:language: java
:lines: 18-

.. group-tab:: Yaml

.. code-block:: yaml

version: "1.0"

module:
meta:
type: remote
spec:
egresses:
- egress:
meta:
type: statefun.kinesis.io/generic-egress
id: example/output-messages
spec:
awsRegion:
type: custom-endpoint
endpoint: https://localhost:4567
id: us-west-1
awsCredentials:
type: profile
profileName: john-doe
profilePath: /path/to/profile/config
maxOutstandingRecords: 9999
clientConfigProperties:
- ThreadingModel: POOLED
- ThreadPoolSize: 10

Please refer to the Kinesis `client configuration <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html>`_ documentation for the full list of available properties.

Kinesis Serializer
^^^^^^^^^^^^^^^^^^

The Kinesis egress needs to know how to turn Java objects into binary data.
The ``KinesisEgressSerializer`` allows users to specify such a schema.
The ``EgressRecord serialize(T value)`` method gets called for each message, allowing users to set a value, and other metadata.

.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java
:language: java
:lines: 18-

AWS Credentials
===============

Both the Kinesis ingress and egress can be configured using standard AWS credential providers.

**Default Provider Chain (default)**

Consults AWS’s default provider chain to determine the AWS credentials.

.. tabs::

.. group-tab:: Java

.. code-block:: none

AwsCredentials.fromDefaultProviderChain();

.. group-tab:: Yaml

.. code-block:: yaml

awsCredentials:
type: default

**Basic**

Specifies the AWS credentials directly with provided access key ID and secret access key strings.

.. tabs::

.. group-tab:: Java

.. code-block:: none

AwsCredentials.basic("accessKeyId", "secretAccessKey");

.. group-tab:: Yaml

.. code-block:: yaml

awsCredentials:
type: basic
accessKeyId: acess-key-id
secretAccessKey: secret-access-key

**Profile**

Specifies the AWS credentials using an AWS configuration profile, along with the profile's configuration path.

.. tabs::

.. group-tab:: Java

.. code-block:: none

AwsCredentials.profile("profile-name", "/path/to/profile/config");

.. group-tab:: Yaml

.. code-block:: yaml

awsCredentials:
type: profile
profileName: profile-name
profilePath: /path/to/profile/config

AWS Region
==========

Both the Kinesis ingress and egress can be configured to a specific AWS region.

**Default Provider Chain (default)**

Consults AWS's default provider chain to determine the AWS region.

.. tabs::

.. group-tab:: Java

.. code-block:: none

AwsRegion.fromDefaultProviderChain();

.. group-tab:: Yaml

.. code-block:: yaml

awsRegion:
type: default

**Specific**

Specifies an AWS region using the region's unique id.

.. tabs::

.. group-tab:: Java

.. code-block:: none

AwsRegion.of("us-west-1");

.. group-tab:: Yaml

.. code-block:: yaml

awsRegion:
type: specific
id: us-west-1

**Custom Endpoint**

Connects to an AWS region through a non-standard AWS service endpoint.
This is typically used only for development and testing purposes.

.. tabs::

.. group-tab:: Java

.. code-block:: none

AwsRegion.ofCustomEndpoint("https://localhost:4567", "us-west-1");

.. group-tab:: Yaml

.. code-block:: yaml

awsRegion:
type: custom-endpoint
endpoint: https://localhost:4567
id: us-west-1
5 changes: 5 additions & 0 deletions statefun-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-kinesis-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Loading