diff --git a/statefun-docs/docs/_static/css/customize-theme.css b/statefun-docs/docs/_static/css/customize-theme.css index d5ca922b4..e295514c5 100644 --- a/statefun-docs/docs/_static/css/customize-theme.css +++ b/statefun-docs/docs/_static/css/customize-theme.css @@ -51,4 +51,17 @@ .scv-banner > a { color: white ! important; -} \ No newline at end of file +} + +.sphinx-tabs .sphinx-menu a.item { + color: #446D6F !important; +} + +.sphinx-tabs .sphinx-menu a.active.item { + border-color: #446D6F !important; +} + +.sphinx-tab { + border-color: #446D6F !important; +} + diff --git a/statefun-docs/docs/_templates/layout.html b/statefun-docs/docs/_templates/layout.html index 89acd5a74..05c177b6f 100644 --- a/statefun-docs/docs/_templates/layout.html +++ b/statefun-docs/docs/_templates/layout.html @@ -3,6 +3,7 @@ {%- set favicon = 'favicon.png' %} {%- set logo = 'logo.png' %} {%- set theme_logo_only = True %} +{%- set extra_css_files = ['_static/css/customize-theme.css'] %} {% block sidebartitle %} diff --git a/statefun-docs/docs/_templates/searchbox.html b/statefun-docs/docs/_templates/searchbox.html deleted file mode 100644 index e69de29bb..000000000 diff --git a/statefun-docs/docs/conf.py b/statefun-docs/docs/conf.py index d8555e9f9..7817989b6 100644 --- a/statefun-docs/docs/conf.py +++ b/statefun-docs/docs/conf.py @@ -1,3 +1,4 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -32,7 +33,8 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ['sphinx.ext.ifconfig'] +# https://github.com/djungelorm/sphinx-tabs +extensions = ['sphinx.ext.ifconfig', 'sphinx_tabs.tabs'] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] @@ -106,8 +108,6 @@ # html_theme = 'sphinx_rtd_theme' -theme_prev_next_buttons_location = 'Top' - # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the # documentation. @@ -139,22 +139,11 @@ ] } -# Custom CSS. -html_css_files = [ - 'css/customize-theme.css', -] - -html_context = { - 'css_files': ['_static/css/customize-theme.css'] -} - - # -- Options for HTMLHelp output ------------------------------------------ # Output file base name for HTML help builder. htmlhelp_basename = 'StatefulFunctionsdoc' - # -- Options for LaTeX output --------------------------------------------- latex_elements = { @@ -180,7 +169,7 @@ # author, documentclass [howto, manual, or own class]). latex_documents = [ (master_doc, 'StatefulFunctions.tex', u'Stateful Functions Documentation', - u'Ververica GmbH', 'manual'), + u'Apache Flink', 'manual'), ] @@ -205,7 +194,6 @@ 'Miscellaneous'), ] - # -- Settings for sphinxcontrib-versioning -------------------------------- scv_greatest_tag = True scv_show_banner = True diff --git a/statefun-docs/docs/index_grid.html b/statefun-docs/docs/index_grid.html index 4f8b2bbc6..e190a3dff 100644 --- a/statefun-docs/docs/index_grid.html +++ b/statefun-docs/docs/index_grid.html @@ -49,11 +49,10 @@
- - +
- \ No newline at end of file + diff --git a/statefun-docs/docs/io_module/index.rst b/statefun-docs/docs/io_module/index.rst index e9df63ecd..6697b6568 100644 --- a/statefun-docs/docs/io_module/index.rst +++ b/statefun-docs/docs/io_module/index.rst @@ -21,6 +21,7 @@ I/O Module :hidden: kafka + kinesis source_sink custom @@ -32,70 +33,115 @@ Based on the concept of Ingress (input) and Egress (output) points, and built on .. _ingress: Ingress -^^^^^^^^ +======= An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions. -An ``IngressIdentifier`` and an ``IngressSpec`` define it. +It is defined via an ``IngressIdentifier`` and an ``IngressSpec``. An ingress identifier, similar to a function type, uniquely identifies an ingress by specifying its input type, a namespace, and a name. -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/Identifiers.java - :language: java - :lines: 18- +The spec defines the details of how to connect to the external system, which is specific to each individual I/O module. Each identifier-spec pair is bound to the system inside an stateful function module. -The spec defines the details of how to connect to the external system, which is specific to each individual I/O module. -Each identifier-spec pair is bound to the system inside an stateful function module. +.. tabs:: -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/ModuleWithIngress.java - :language: java - :lines: 18- + .. group-tab:: Java + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/Identifiers.java + :language: java + :lines: 18- + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/ModuleWithIngress.java + :language: java + :lines: 18- + + .. group-tab:: Yaml + + .. code-block:: Yaml + + version: "1.0" + + module: + meta: + type: remote + spec: + ingresses: + - ingress: + meta: + id: example/user-ingress + type: # ingress type + spec: # ingress specific configurations Router -"""""" +^^^^^^ A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions. +Routers are bound to the system via a stateful function module, and unlike other components, an ingress may have any number of routers. -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/UserRouter.java - :language: java - :lines: 18- +.. tabs:: -Routers are bound to the system via a stateful function module. -Unlike other components, an ingress may have any number of routers. + .. group-tab:: Java -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/ModuleWithRouter.java - :language: java - :lines: 18- + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/UserRouter.java + :language: java + :lines: 18- + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/ingress/ModuleWithRouter.java + :language: java + :lines: 18- + + .. group-tab:: Yaml + + When defined in ``yaml``, routers are defined by a list of function types. + The ``id`` component of the address is pulled from the key associated with each record in its underlying source implementation. + .. code-block:: Yaml + + targets: + - example-namespace/my-function-1 + - example-namespace/my-function-2 .. _egress: Egress -^^^^^^ +====== Egress is the opposite of ingress; it is a point that takes messages and writes them to external systems. Each egress is defined using two components, an ``EgressIdentifier`` and an ``EgressSpec``. An egress identifier uniquely identifies an egress based on a namespace, name, and producing type. - -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/Identifiers.java - :language: java - :lines: 18- - An egress spec defines the details of how to connect to the external system, the details are specific to each individual I/O module. Each identifier-spec pair are bound to the system inside a stateful function module. -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/ModuleWithEgress.java - :language: java - :lines: 18- +.. tabs:: + + .. group-tab:: Java + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/Identifiers.java + :language: java + :lines: 18- + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/ModuleWithEgress.java + :language: java + :lines: 18- + + .. group-tab:: Yaml + + .. code-block:: Yaml + + version: "1.0" + + module: + meta: + type: remote + spec: + egresses: + - egress: + meta: + id: example/user-egress + type: # egress type + spec: # egress specific configurations Stateful functions may then message an egress the same way they message another function, passing the egress identifier as function type. .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/egress/FnOutputting.java :language: java :lines: 18- - -I/O modules leverage `Java’s Service Provider Interfaces (SPI) `_ for discovery. -This means that every JAR should contain a file ``org.apache.flink.statefun.sdk.spi.StatefulFunctionModule`` in the ``META_INF/services`` resource directory that lists all available modules that it provides. - -.. code-block:: yaml - - BasicFunctionModule diff --git a/statefun-docs/docs/io_module/kafka.rst b/statefun-docs/docs/io_module/kafka.rst index 1f4639dd4..5dc7944d5 100644 --- a/statefun-docs/docs/io_module/kafka.rst +++ b/statefun-docs/docs/io_module/kafka.rst @@ -19,13 +19,14 @@ Apache Kafka Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics. It is based on Apache Flink's universal `Kafka connector `_ and provides exactly-once processing semantics. +The Kafka I/O Module is configurable in Yaml or Java. .. contents:: :local: Dependency -=========== +========== -To use the Kafka I/O Module, please include the following dependency in your pom. +To use the Kafka I/O Module in Java, please include the following dependency in your pom. .. code-block:: xml @@ -36,10 +37,10 @@ To use the Kafka I/O Module, please include the following dependency in your pom provided -Kafka Ingress Builder -===================== +Kafka Ingress Spec +================== -A ``KafkaIngressBuilder`` declares an ingress spec for consuming from Kafka cluster. +A ``KafkaIngressSpec`` declares an ingress spec for consuming from Kafka cluster. It accepts the following arguments: @@ -47,33 +48,167 @@ It accepts the following arguments: 2) The topic name / list of topic names 3) The address of the bootstrap servers 4) The consumer group id to use -5) A ``KafkaIngressDeserializer`` for deserializing data from Kafka +5) A ``KafkaIngressDeserializer`` for deserializing data from Kafka (Java only) 6) The position to start consuming from -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java - :language: java - :lines: 18- +.. tabs:: -The ingress allows configuring the startup position to be one of the following: + .. group-tab:: Java -* ``KafkaIngressStartupPosition#fromGroupOffsets()`` (default): starts from offsets that were committed to Kafka for the specified consumer group. -* ``KafkaIngressStartupPosition#fromEarliest()``: starts from the earliest offset. -* ``KafkaIngressStartupPosition#fromLatest()``: starts from the latest offset. -* ``KafkaIngressStartupPosition#fromSpecificOffsets(Map)``: starts from specific offsets, defined as a map of partitions to their target starting offset. -* ``KafkaIngressStartupPosition#fromDate(Date)``: starts from offsets that have an ingestion time larger than or equal to a specified date. + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java + :language: java + :lines: 18- -On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to -start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured -using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``. By default, this is set to be the latest position. + .. group-tab:: Yaml + + .. code-block:: yaml + + version: "1.0" + + module: + meta: + type: remote + spec: + ingresses: + - ingress: + meta: + type: statefun.kafka.io/routable-protobuf-ingress + id: example/user-ingress + spec: + address: kafka-broker:9092 + consumerGroupId: routable-kafka-e2e + startupPosition: + type: earliest + topics: + - topic: messages-1 + typeUrl: com.googleapis/com.company.MessageWithAddress + targets: + - example-namespace/my-function-1 + - example-namespace/my-function-2 The ingress also accepts properties to directly configure the Kafka client, using ``KafkaIngressBuilder#withProperties(Properties)``. Please refer to the Kafka `consumer configuration `_ documentation for the full list of available properties. Note that configuration passed using named methods, such as ``KafkaIngressBuilder#withConsumerGroupId(String)``, will have higher precedence and overwrite their respective settings in the provided properties. +Startup Position +^^^^^^^^^^^^^^^^ + +The ingress allows configuring the startup position to be one of the following: + +**From Group Offset (default)** + +Starts from offsets that were committed to Kafka for the specified consumer group. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaIngressStartupPosition#fromGroupOffsets(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: group-offsets + +**Earliest** + +Starts from the earliest offset. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaIngressStartupPosition#fromEarliest(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: earliest + +**Latest** + +Starts from the latest offset. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaIngressStartupPosition#fromLatest(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: latest + +**Specific Offsets** + +Starts from specific offsets, defined as a map of partitions to their target starting offset. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + Map offsets = new HashMap<>(); + offsets.add(new TopicPartition("user-topic", 0), 91); + offsets.add(new TopicPartition("user-topic", 11), 11); + offsets.add(new TopicPartition("user-topic", 8), 8); + + KafkaIngressStartupPosition#fromSpecificOffsets(offsets); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: specific-offsets + offsets: + - user-topic/0: 91 + - user-topic/1: 11 + - user-topic/2: 8 + +**Date** + +Starts from offsets that have an ingestion time larger than or equal to a specified date. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaIngressStartupPosition#fromDate(ZonedDateTime.now()); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: date + date: 2020-02-01 04:15:00.00 Z + +On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to +start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured +using ``KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)``. +By default, this is set to be the latest position. + Kafka Deserializer -"""""""""""""""""" +^^^^^^^^^^^^^^^^^^ -The Kafka ingress needs to know how to turn the binary data in Kafka into Java objects. +When using the Java api, the Kafka ingress needs to know how to turn the binary data in Kafka into Java objects. The ``KafkaIngressDeserializer`` allows users to specify such a schema. The ``T deserialize(ConsumerRecord record)`` method gets called for each Kafka message, passing the key, value, and metadata from Kafka. @@ -90,30 +225,111 @@ It accepts the following arguments: 1) The egress identifier associated with this egress 2) The address of the bootstrap servers -3) A ``KafkaEgressSerializer`` for serializing data into Kafka +3) A ``KafkaEgressSerializer`` for serializing data into Kafka (Java only) 4) The fault tolerance semantic 5) Properties for the Kafka producer -.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/EgressSpecs.java - :language: java - :lines: 18- +.. tabs:: + + .. group-tab:: Java + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kafka/EgressSpecs.java + :language: java + :lines: 18- + + .. group-tab:: Yaml + + .. code-block:: yaml + + version: "1.0" + + module: + meta: + type: remote + spec: + egresses: + - egress: + meta: + type: statefun.kafka.io/generic-egress + id: example/output-messages + spec: + address: kafka-broker:9092 + deliverySemantic: + type: exactly-once + transactionTimeoutMillis: 100000 + properties: + - foo.config: bar Please refer to the Kafka `producer configuration `_ documentation for the full list of available properties. Kafka Egress and Fault Tolerance -"""""""""""""""""""""""""""""""" +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees. -You can choose three different modes of operating based through the ``KafkaEgressBuilder``. +You can choose three different modes of operation. + +**None** + +Nothing is guaranteed, produced records can be lost or duplicated. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaEgressBuilder#withNoProducerSemantics(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + deliverySemantic: + type: none + +**At Least Once** + +Stateful Functions will guarantee that nor records will be lost but they can be duplicated. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaEgressBuilder#withNoProducerSemantics(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + deliverySemantic: + type: none + +**Exactly Once** + +Stateful Functions uses Kafka transactions to provide exactly-once semantics. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KafkaEgressBuilder#withExactlyOnceProducerSemantics(Duration.minutes(15)); + + .. group-tab:: Yaml + + .. code-block:: yaml -* ``KafkaEgressBuilder#withNoProducerSemantics``: Nothing is guaranteed. Produced records can be lost or duplicated. -* ``KafkaEgressBuilder#withAtLeastOnceProducerSemantics``: Stateful Functions will guarantee that nor records will be lost but they can be duplicated. -* ``KafkaEgressBuilder#withExactlyOnceProducerSemantics``: Stateful Functions uses Kafka transactions to provide exactly-once semantics. + deliverySemantic: + type: exactly-once + transactionTimeoutMillis: 900000 # 15 min Kafka Serializer -"""""""""""""""" +^^^^^^^^^^^^^^^^ -The Kafka egress needs to know how to turn Java objects into binary data. +When using the Java pi, the Kafka egress needs to know how to turn Java objects into binary data. The ``KafkaEgressSerializer`` allows users to specify such a schema. The ``ProducerRecord serialize(T out)`` method gets called for each message, allowing users to set a key, value, and other metadata. diff --git a/statefun-docs/docs/io_module/kinesis.rst b/statefun-docs/docs/io_module/kinesis.rst new file mode 100644 index 000000000..3f6ce0f9d --- /dev/null +++ b/statefun-docs/docs/io_module/kinesis.rst @@ -0,0 +1,374 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +########### +AWS Kinesis +########### + +Stateful Functions offers an AWS Kinesis I/O Module for reading from and writing to Kinesis streams. +It is based on Apache Flink's `Kinesis connector `_. +The Kinesis I/O Module is configurable in Yaml or Java. + +.. contents:: :local: + +Dependency +========== + +To use the Kinesis I/O Module in Java, please include the following dependency in your pom. + +.. code-block:: xml + + + org.apache.flink + statefun-kinesis-io + {version} + provided + + +Kinesis Ingress Spec +==================== + +A ``KinesisIngressSpec`` declares an ingress spec for consuming from Kinesis stream. + +It accepts the following arguments: + +1) The AWS region +2) An AWS credentials provider +3) A ``KinesisIngressDeserializer`` for deserializing data from Kinesis (Java only) +4) The stream start position +5) Properties for the Kinesis client +6) The name of the stream to consume from + +.. tabs:: + + .. group-tab:: Java + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java + :language: java + :lines: 18- + + .. group-tab:: Yaml + + .. code-block:: yaml + + version: "1.0" + + module: + meta: + type: remote + spec: + ingresses: + - ingress: + meta: + type: statefun.kinesis.io/routable-protobuf-ingress + id: example-namespace/messages + spec: + awsRegion: + type: specific + id: us-west-1 + awsCredentials: + type: basic + accessKeyId: my_access_key_id + secretAccessKey: my_secret_access_key + startupPosition: + type: earliest + streams: + - stream: stream-1 + typeUrl: com.googleapis/com.mycomp.foo.Message + targets: + - example-namespace/my-function-1 + - example-namespace/my-function-2 + - stream: stream-2 + typeUrl: com.googleapis/com.mycomp.foo.Message + targets: + - example-namespace/my-function-1 + clientConfigProperties: + - SocketTimeout: 9999 + - MaxConnections: 15 + +The ingress also accepts properties to directly configure the Kinesis client, using ``KinesisIngressBuilder#withClientConfigurationProperty()``. +Please refer to the Kinesis `client configuration `_ documentation for the full list of available properties. +Note that configuration passed using named methods will have higher precedence and overwrite their respective settings in the provided properties. + +Startup Position +^^^^^^^^^^^^^^^^ + +The ingress allows configuring the startup position to be one of the following: + +**Latest (default)** + +Start consuming from the latest position, i.e. head of the stream shards. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KinesisIngressStartupPosition#fromLatest(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: latest + +**Earliest** + +Start consuming from the earliest position possible. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KinesisIngressStartupPosition#fromEarliest(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: earliest + + +**Date** + +Starts from offsets that have an ingestion time larger than or equal to a specified date. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + KinesisIngressStartupPosition#fromDate(ZonedDateTime.now()) + + .. group-tab:: Yaml + + .. code-block:: yaml + + startupPosition: + type: date + date: 2020-02-01 04:15:00.00 Z + +Kinesis Deserializer +^^^^^^^^^^^^^^^^^^^^ + +The Kinesis ingress needs to know how to turn the binary data in Kinesis into Java objects. +The ``KinesisIngressDeserializer`` allows users to specify such a schema. +The ``T deserialize(IngressRecord ingressRecord)`` method gets called for each Kinesis record, passing the binary data and metadata from Kinesis. + +.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java + :language: java + :lines: 18- + +Kinesis Egress Spec +=================== + +A ``KinesisEgressBuilder`` declares an egress spec for writing data out to a Kinesis stream. + +It accepts the following arguments: + +1) The egress identifier associated with this egress +2) The AWS credentials provider +3) A ``KinesisEgressSerializer`` for serializing data into Kinesis (Java only) +4) The AWS region +5) Properties for the Kinesis client +6) The number of max outstanding records before backpressure is applied + +.. tabs:: + + .. group-tab:: Java + + .. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java + :language: java + :lines: 18- + + .. group-tab:: Yaml + + .. code-block:: yaml + + version: "1.0" + + module: + meta: + type: remote + spec: + egresses: + - egress: + meta: + type: statefun.kinesis.io/generic-egress + id: example/output-messages + spec: + awsRegion: + type: custom-endpoint + endpoint: https://localhost:4567 + id: us-west-1 + awsCredentials: + type: profile + profileName: john-doe + profilePath: /path/to/profile/config + maxOutstandingRecords: 9999 + clientConfigProperties: + - ThreadingModel: POOLED + - ThreadPoolSize: 10 + +Please refer to the Kinesis `client configuration `_ documentation for the full list of available properties. + +Kinesis Serializer +^^^^^^^^^^^^^^^^^^ + +The Kinesis egress needs to know how to turn Java objects into binary data. +The ``KinesisEgressSerializer`` allows users to specify such a schema. +The ``EgressRecord serialize(T value)`` method gets called for each message, allowing users to set a value, and other metadata. + +.. literalinclude:: ../../src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java + :language: java + :lines: 18- + +AWS Credentials +=============== + +Both the Kinesis ingress and egress can be configured using standard AWS credential providers. + +**Default Provider Chain (default)** + +Consults AWS’s default provider chain to determine the AWS credentials. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + AwsCredentials.fromDefaultProviderChain(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + awsCredentials: + type: default + +**Basic** + +Specifies the AWS credentials directly with provided access key ID and secret access key strings. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + AwsCredentials.basic("accessKeyId", "secretAccessKey"); + + .. group-tab:: Yaml + + .. code-block:: yaml + + awsCredentials: + type: basic + accessKeyId: acess-key-id + secretAccessKey: secret-access-key + +**Profile** + +Specifies the AWS credentials using an AWS configuration profile, along with the profile's configuration path. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + AwsCredentials.profile("profile-name", "/path/to/profile/config"); + + .. group-tab:: Yaml + + .. code-block:: yaml + + awsCredentials: + type: profile + profileName: profile-name + profilePath: /path/to/profile/config + +AWS Region +========== + +Both the Kinesis ingress and egress can be configured to a specific AWS region. + +**Default Provider Chain (default)** + +Consults AWS's default provider chain to determine the AWS region. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + AwsRegion.fromDefaultProviderChain(); + + .. group-tab:: Yaml + + .. code-block:: yaml + + awsRegion: + type: default + +**Specific** + +Specifies an AWS region using the region's unique id. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + AwsRegion.of("us-west-1"); + + .. group-tab:: Yaml + + .. code-block:: yaml + + awsRegion: + type: specific + id: us-west-1 + +**Custom Endpoint** + +Connects to an AWS region through a non-standard AWS service endpoint. +This is typically used only for development and testing purposes. + +.. tabs:: + + .. group-tab:: Java + + .. code-block:: none + + AwsRegion.ofCustomEndpoint("https://localhost:4567", "us-west-1"); + + .. group-tab:: Yaml + + .. code-block:: yaml + + awsRegion: + type: custom-endpoint + endpoint: https://localhost:4567 + id: us-west-1 diff --git a/statefun-docs/pom.xml b/statefun-docs/pom.xml index ea7ac1ab2..e9600cdfa 100644 --- a/statefun-docs/pom.xml +++ b/statefun-docs/pom.xml @@ -49,6 +49,11 @@ under the License. ${project.version} provided + + org.apache.flink + statefun-kinesis-io + ${project.version} + com.fasterxml.jackson.core jackson-databind diff --git a/statefun-docs/requirements.txt b/statefun-docs/requirements.txt index 97882e2b1..3f5e986ee 100644 --- a/statefun-docs/requirements.txt +++ b/statefun-docs/requirements.txt @@ -1,5 +1,32 @@ +alabaster==0.7.12 +argh==0.26.2 +Babel==2.6.0 +certifi==2019.3.9 +chardet==3.0.4 +Click==7.0 +colorclass==2.2.0 +docutils==0.14 +idna==2.8 +imagesize==1.1.0 +Jinja2==2.10.1 +livereload==2.6.0 +MarkupSafe==1.1.1 +packaging==19.0 +pathtools==0.1.2 +port-for==0.3.1 +Pygments==2.3.1 +pyparsing==2.4.0 +pytz==2019.1 +PyYAML==5.1 +requests==2.21.0 six==1.11.0 +snowballstemmer==1.2.1 Sphinx==1.7.9 sphinx-autobuild==0.7.1 -sphinx_rtd_theme==0.4.1 +sphinx-rtd-theme==0.4.1 +sphinx-tabs==1.1.13 sphinxcontrib-versioning==2.2.1 +sphinxcontrib-websupport==1.1.0 +tornado==6.0.2 +urllib3==1.24.1 +watchdog==0.9.0 diff --git a/statefun-docs/runtime.txt b/statefun-docs/runtime.txt index 475ba515c..d70c8f8d8 100644 --- a/statefun-docs/runtime.txt +++ b/statefun-docs/runtime.txt @@ -1 +1 @@ -3.7 +3.6 diff --git a/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java new file mode 100644 index 000000000..773fbce45 --- /dev/null +++ b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/EgressSpecs.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.docs.io.kinesis; + +import org.apache.flink.statefun.docs.models.User; +import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.io.EgressSpec; +import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials; +import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder; + +public class EgressSpecs { + + public static final EgressIdentifier ID = + new EgressIdentifier<>("example", "output-egress", User.class); + + public static final EgressSpec kinesisEgress = + KinesisEgressBuilder.forIdentifier(ID) + .withAwsCredentials(AwsCredentials.fromDefaultProviderChain()) + .withAwsRegion("us-west-1") + .withMaxOutstandingRecords(100) + .withClientConfigurationProperty("key", "value") + .withSerializer(UserSerializer.class) + .build(); +} diff --git a/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java new file mode 100644 index 000000000..9a6d60687 --- /dev/null +++ b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/IngressSpecs.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.docs.io.kinesis; + +import org.apache.flink.statefun.docs.models.User; +import org.apache.flink.statefun.sdk.io.IngressIdentifier; +import org.apache.flink.statefun.sdk.io.IngressSpec; +import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition; + +public class IngressSpecs { + + public static final IngressIdentifier ID = + new IngressIdentifier<>(User.class, "example", "input-ingress"); + + public static final IngressSpec kinesisIngress = + KinesisIngressBuilder.forIdentifier(ID) + .withAwsRegion("us-west-1") + .withAwsCredentials(AwsCredentials.fromDefaultProviderChain()) + .withDeserializer(UserDeserializer.class) + .withStream("stream-name") + .withStartupPosition(KinesisIngressStartupPosition.fromEarliest()) + .withClientConfigurationProperty("key", "value") + .build(); +} diff --git a/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java new file mode 100644 index 000000000..27669c6a9 --- /dev/null +++ b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserDeserializer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.docs.io.kinesis; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import org.apache.flink.statefun.docs.models.User; +import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UserDeserializer implements KinesisIngressDeserializer { + + private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class); + + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public User deserialize(IngressRecord ingressRecord) { + try { + return mapper.readValue(ingressRecord.getData(), User.class); + } catch (IOException e) { + LOG.debug("Failed to deserialize record", e); + return null; + } + } +} diff --git a/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java new file mode 100644 index 000000000..f145ba69b --- /dev/null +++ b/statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kinesis/UserSerializer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.docs.io.kinesis; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import org.apache.flink.statefun.docs.models.User; +import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord; +import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UserSerializer implements KinesisEgressSerializer { + + private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class); + + private static final String STREAM = "user-stream"; + + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public EgressRecord serialize(User value) { + try { + return EgressRecord.newBuilder() + .withPartitionKey(value.getUserId()) + .withData(mapper.writeValueAsBytes(value)) + .withStream(STREAM) + .build(); + } catch (IOException e) { + LOG.info("Failed to serializer user", e); + return null; + } + } +}