For using the AWS Kinesis Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
</dependency>
Note
|
Starting with version 4.0 , this project is fully based on AWS SDK v2 and therefore has a lot of breaking changes and incompatibilities with the previous version.
|
The Spring Cloud Stream Binder for AWS Kinesis provides the binding implementation for the Spring Cloud Stream. This implementation uses Spring Integration AWS Kinesis Channel Adapters at its foundation. The following captures how the Kinesis Binder implementation maps each of the configured destination to AWS Kinesis Streams:
Unlike Apache Kafka the AWS Kinesis doesn’t provide out-of-the-box support for consumer groups.
The support of this feature is implemented as a part of MetadataStore
key for shard checkpoints in the KinesisMessageDrivenChannelAdapter
- [CONSUMER_GROUP]:[STREAM]:[SHARD_ID]
.
In addition, the LockRegistry
is used to ensure exclusive access to each shard.
This way only one channel adapter in the same consumer group will consumer messages from a single shard in the stream it is configured for.
The partitioning logic in AWS Kinesis is similar to the Apache Kafka support, but with slightly different logic.
The partitionKey
on the producer side determines which shard in the stream the data record is assigned to.
Partition keys are Unicode strings with a maximum length limit of 256 characters for each key.
AWS Kinesis uses the partition key as input to a hash function that maps the partition key and associated data to a specific shard.
Specifically, an MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards.
As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream.
But at the same time we can’t select target shard to send explicitly.
Although calculating the hash manually (and use explicitHashKeyExpression
for producer, respectively), we may track the target shard by inclusion into its HashKeyRange
.
By default, partition key is a result of the Object.hash()
from the message payload
.
The Spring Cloud Stream partition handling logic is excluded in case of AWS Kinesis Binder since it is out of use and the provided producer.partitionKeyExpression
is propagated to the KinesisMessageHandler
directly.
On the consumer side the instanceCount
and instanceIndex
are used to distribute shards between consumers in group evenly.
This has an effect only for regular KinesisMessageDrivenChannelAdapter
which can assign specific shards for the target Kinesis consumer.
Consumer groups are implemented with focus on High availability, Message ordering and guaranteed Message delivery in Spring cloud stream.
A single consumer
for the message is ensured by consumer group abstraction.
To have a highly available consumer group for your kinesis stream:
-
Ensure all instances of your consumer applications use a shared
DynamoDbMetadataStore
andDynamoDbLockRegistry
(See below for configuration options). -
Use same group name for the channel in all application instances by using property
spring.cloud.stream.bindings.<bindingTarget>.group
.
These configurations alone guarantee HA, message ordering and guaranteed message delivery. However, even distribution across instances is not guaranteed as of now. There is a very high chance that a single instance in a consumer group will pick up all the shards for consuming. But, when that instance goes down (couldn’t send heartbeat for any reason), other instance in the consumer group will start processing from the last checkpoint of the previous consumer (for shardIterator type TRIM_HORIZON).
So, configuring consumer concurrency is important to achieve throughput.
It can be configured using spring.cloud.stream.bindings.<bindingTarget>.consumer.concurrency
.
It is possible to evenly distribute shard across all instances within a single consumer group. This done by configuring:
-
spring.cloud.stream.instanceCount=
to number of instances -
spring.cloud.stream.instanceIndex=
current instance’s index
The only way to achieve HA in this case is that, when an instance processing a particular shard goes down, another instance must have spring.cloud.stream.instanceIndex=
to be the same as the failed instance’s index to start processing from those shards.
This section contains settings specific to the Kinesis Binder and bound channels.
For general binding configuration options and properties, please refer to the Spring Cloud Stream core documentation.
Also, Kinesis Binder exposes KinesisAsyncClient
, DynamoDbAsyncClient
, and CloudWatchAsyncClient
beans into its child application context.
By default, these beans rely on the auto-configuration from Spring Cloud AWS.
See CredentialsProperties
, RegionProperties
, and AwsClientProperties
extensions for more information.
The following properties are available for Kinesis Binder configuration, which start with the spring.cloud.stream.kinesis.binder.
prefix
- headers
-
The set of custom headers to transfer over AWS Kinesis
Default: "correlationId", "sequenceSize", "sequenceNumber", "contentType", "originalContentType".
- describeStreamBackoff
-
The amount of time in milliseconds in between retries for the
DescribeStream
operationDefault:
1000
. - describeStreamRetries
-
The amount of times the consumer will retry a
DescribeStream
operation waiting for the stream to be inACTIVE
stateDefault:
50
. - autoCreateStream
-
If set to
true
, the binder will create the stream automatically. If set tofalse
, the binder will rely on the stream being already created.Default:
true
- autoAddShards
-
If set to
true
, the binder will create new shards automatically. If set tofalse
, the binder will rely on the shard size of the stream being already configured. If the shard count of the target stream is smaller than the expected value, the binder will ignore that valueDefault:
false
- minShardCount
-
Effective only if
autoAddShards
is set totrue
. The minimum number of shards that the binder will configure on the stream from which it produces/consumes data. It can be superseded by thepartitionCount
setting of the producer or by the value ofinstanceCount * concurrency
settings of the producer (if either is larger)Default:
1
- kplKclEnabled
-
Enable the usage of Kinesis Client Library / Kinesis Producer Library for all message consumption and production
Default:
false
- enableObservation
-
Enable a Micrometer Observation instrumentation on Kinesis Binder producer and consumer. By default, Kinesis binder propagate traces producer to consumer via embedded headers. The standard W3C
traceparent
and Brave patternX-B3*
for headers are mapped by default. The Kinesis Binder observation is fully based on Spring Integration conventionsDefault:
false
Support for consumer groups is implemented using DynamoDbMetadataStore.
The partitionKey
name used in the table is metadataKey
.
This is not configurable.
DynamoDB Checkpoint properties are prefixed with spring.cloud.stream.kinesis.binder.checkpoint.
- table
-
The name to give the DynamoDb table
Default:
SpringIntegrationMetadataStore
- createDelay
-
The amount of time in seconds between each polling attempt while waiting for the checkpoint DynamoDB table to be created
Default:
1
- createRetries
-
The amount of times the consumer will poll DynamoDB while waiting for the checkpoint table to be created
Default:
25
- billingMode
-
The Billing Mode of the DynamoDB table. See DynamoDB On-Demand Mode. Possible values are
provisioned
andpayPerRequest
. If left empty or set topayPerRequest
bothreadCapacity
andwriteCapacity
are ignoredDefault:
payPerRequest
- readCapacity
-
The Read capacity of the DynamoDb table. See DynamoDB Provisioned Throughput. This property is used only when
billingMode
is set toprovisioned
Default:
1
- writeCapacity
-
The
write
capacity of the DynamoDb table. See DynamoDB Provisioned Throughput. This property is used only whenbillingMode
is set toprovisioned
Default:
1
- timeToLive
-
A period in seconds for items expiration. See DynamoDB TTL
No default - means no record expiration.
LockRegistry is used to ensure exclusive access to each shard so that, only one channel adapter in the same consumer group will consumer messages from a single shard in the stream. This is implemented using DynamoDbLockRegistry
DynamoDB LockRegistry
properties are prefixed with spring.cloud.stream.kinesis.binder.locks.
- table
-
The name to give the DynamoDB table
Default:
SpringIntegrationLockRegistry
- billingMode
-
The Billing Mode of the DynamoDB table. See DynamoDB On-Demand Mode. Possible values are
provisioned
andpayPerRequest
. If left empty or set topayPerRequest
bothreadCapacity
andwriteCapacity
are ignoredDefault:
payPerRequest
- readCapacity
-
The Read capacity of the DynamoDB table. See DynamoDB Provisioned Throughput. This property is used only when
billingMode
is set toprovisioned
Default:
1
- writeCapacity
-
The
write
capacity of the DynamoDb table. See DynamoDB Provisioned Throughput. This property is used only whenbillingMode
is set toprovisioned
Default:
1
- leaseDuration
-
The length of time that the lease for the lock will be granted for. If this is set to, for example, 30 seconds, then the lock will be considered as expired after that period and a new owner can acquire it. See also See DynamoDB TTL
Default:
20
- refreshPeriod
-
How long to wait before trying to get the lock again (if set to 10 seconds, for example, it would attempt to do so every 10 seconds)
Default:
1
The following properties are available for Kinesis consumers only and must be prefixed with spring.cloud.stream.kinesis.bindings.<channel-name>.consumer
- startTimeout
-
The amount of time to wait for the consumer to start, in milliseconds.
Default:
60000
. - listenerMode
-
The mode in which records are processed. If
record
, eachMessage
will containbyte[]
from a singleRecord.data
. Ifbatch
, eachMessage
will contain aList<byte[]>
extracted from the consumed records. WhenuseNativeDecoding = true
is used on the consumer together with thelistenerMode = batch
, there is no any out-of-the-box conversion happened and a result message contains a payload likeList<software.amazon.awssdk.services.kinesis.model.Record>
. It’s up to target application to convert those records manually.Default:
record
- checkpointMode
-
The mode in which checkpoints are updated. If
record
, checkpoints occur after each record is processed (but this option is only effective iflistenerMode
is set torecord
). Ifbatch
, checkpoints occur after each batch of records is processed. Ifmanual
, checkpoints occur on demand via theCheckpointer
callback. Ifperiodic
, checkpoints occurs at specified time interval (frominterval
property in checkpoint configuration)Default:
batch
- checkpointInterval
-
The interval, in milliseconds, between two checkpoints when checkpoint mode is
periodic
.Default -
5000
- workerId
-
The worker identifier used to distinguish different workers/processes (only used when Kinesis Client Library is enabled).
No default - if not set, default value inside spring-integration-aws will be used (random UUID).
- fanOut
-
The KCL retrieval mode:
true
- fan-out,false
- polling.Default:
true
- metricsLevel
-
The level of metrics to be collected by the Kinesis Client Library. Possible values are
NONE
,SUMMARY
,DETAILED
.Default:
DETAILED
- recordsLimit
-
The maximum number of records to poll per
GetRecords
request. Must not be greater than10000
.Default:
1000
- idleBetweenPolls
-
The delay between Kinesis records request to satisfy AWS throughput requirements.
Default:
1000
- consumerBackoff
-
The amount of time the consumer will wait to attempt another
GetRecords
operation after a read with no results, in milliseconds.Default:
1000
- shardIteratorType
-
The
com.amazonaws.services.kinesis.model.ShardIteratorType
name with an optionalsequenceNumber
for theAT_SEQUENCE_NUMBER/AFTER_SEQUENCE_NUMBER
or milliseconds for theAT_TIMESTAMP
after:
. For example:AT_TIMESTAMP:1515090166767
.Default:
LATEST
for anonymous groups andTRIM_HORIZON
otherwise.
Note
|
When TRIM_HORIZON shard iterator type is used, we need to take into account the time lag which happens during pointing the ShardIterator to the last untrimmed record in the shard in the system (the oldest data record in the shard).
So the getRecords() will move from that point to the last point, which takes time.
It is by default 1 day, and it can be extended to 7 days.
This happens only for new consumer groups.
Any subsequent starts of the consumer in the same group are adjusted according the stored checkpoint via AFTER_SEQUENCE_NUMBER iterator type.
|
- shardId
-
An explicit shard id to consume from.
Note
|
Kinesis Client Library does not support a configuration for a specific shard.
When shardId property is used, it is ignored for Kinesis Client Library and standard stream consumer distribution is applied.
Also, in case of an instanceCount > 1 , application will throw validation exception.
The instanceCount and shardId are considered as mutually exclusive.
|
- embedHeaders
-
Whether to extract headers and payload from Kinesis record data.
Default:
false
- emptyRecordList
-
Whether to emit an empty list of records into a consumer. Works only in
listenerMode.batch
.Default:
false
- leaseTableName
-
The KCL table name for leases.
Default: consumer group
- pollingMaxRecords
-
The KCL max records for request in polling mode.
Default: 10000
- pollingIdleTime
-
The KCL idle between requests in polling mode.
Default: 1500L
- gracefulShutdownTimeout
-
The KCL graceful shutdown timeout in milliseconds.
Default: 0 - regular shutdown process
Starting with version 4.0.4
(basically since spring-integration-aws-3.0.8
), the KclMessageDrivenChannelAdapter
can be customized programmatically for the ConfigsBuilder
parts.
For example, to set a custom value for the LeaseManagementConfig.maxLeasesForWorker
property, the ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter>
bean has to be provided:
@Bean
ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter> consumerEndpointCustomizer() {
return (endpoint, destinationName, group) ->
endpoint.setLeaseManagementConfigCustomizer(leaseManagementConfig ->
leaseManagementConfig.maxLeasesForWorker(10));
}
Other similar setters on the KclMessageDrivenChannelAdapter
are:
setCoordinatorConfigCustomizer(Consumer<CoordinatorConfig> coordinatorConfigCustomizer);
setLifecycleConfigCustomizer(Consumer<LifecycleConfig> lifecycleConfigCustomizer);
setMetricsConfigCustomizer(Consumer<MetricsConfig> metricsConfigCustomizer);
The following properties are available for Kinesis producers only and must be prefixed with spring.cloud.stream.kinesis.bindings.<bindingTarget>.producer.
.
- sync
-
Whether the producer should act in a synchronous manner with respect to writing records into a stream. If true, the producer will wait for a response from Kinesis after a
PutRecord
operation.Default:
false
- sendTimeout
-
Effective only if
sync
is set totrue
. The amount of time to wait for a response from Kinesis after aPutRecord
operation, in milliseconds.Default:
10000
Also, if you’d like to produce a batch of records into Kinesis stream, the message payload must be as a PutRecordsRequest
instance and general Spring Cloud Stream producer property useNativeEncoding
must be set to true
, so Spring Cloud Stream won’t try to convert a PutRecordsRequest
into a byte[]
. The content of the PutRecordsRequest
is now end-user responsibility.
- embedHeaders
-
Whether to serialize message headers and payload into Kinesis record data.
Default:
false
- recordMetadataChannel
-
The bean name of a MessageChannel to which successful send results should be sent. Works only for async mode.
The binder can be configured to send producer exceptions to an error channel. See the section on Spring Cloud Stream error handling for more information.
The payload of the ErrorMessage
for a send
failure is an AwsRequestFailureException
with properties:
-
failedMessage
- the spring-messagingMessage<?>
that failed to be sent. -
request
- the rawAwsRequest
(eitherPutRecordRequest
orPutRecordsRequest
) that was created from thefailedMessage
.
There is no automatic handling of these exceptions (such as sending to a dead letter queue), but you can consume these exceptions with your own Spring Integration flow.
Starting with version 1.2, if your Spring Cloud Stream application delivered only in the source
role, the extra beans, required for sink
(or Kinesis consumers), are not going to be registered in the application context and, therefore, no need to worry about their resources on AWS.
The story is about DynamoDB and Cloud Watch.
In order to be able to run properly on AWS, the role that will be used by the application needs to have a set of policies configured. Here are the policies statements that your application role need:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:ListShards",
"kinesis:SubscribeToShard",
"kinesis:DescribeStreamSummary",
"kinesis:DescribeStreamConsumer",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:PutRecords",
"kinesis:DescribeStream"
],
"Resource": [
"arn:aws:kinesis:<region>:<account_number>:*/*/consumer/*:*",
"arn:aws:kinesis:<region>:<account_number>:stream/<stream_name>"
]
},
{
"Effect": "Allow",
"Action": "kinesis:DescribeLimits",
"Resource": "*"
},
{
"Sid": "DynamoDB",
"Effect": "Allow",
"Action": [
"dynamodb:BatchGetItem",
"dynamodb:BatchWriteItem",
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:DescribeTable"
],
"Resource": [
"arn:aws:dynamodb:<region>:<account>:table/<name-of-metadata-table>",
"arn:aws:dynamodb:<region>:<account>:table/<name-of-lock-table>"
]
}
]
}
Keep in mind that these are only the policies to allow the application to consume/produce records from/to Kinesis. If you’re going to allow spring-cloud-stream-binder-kinesis to create the resources for you, you’ll need an extra set of policies.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"kinesis:CreateStream",
"kinesis:UpdateShardCount",
"kinesis:EnableEnhancedMonitoring",
"kinesis:DisableEnhancedMonitoring",
"dynamodb:DeleteTable",
"dynamodb:UpdateTable"
],
"Resource": [
"arn:aws:dynamodb:<region>:<account>:table/<table_name>",
"arn:aws:kinesis:<region>:<account>:stream/<stream_name>"
]
}
]
}
If [Server-Side Encryption](https://docs.aws.amazon.com/streams/latest/dev/what-is-sse.html) is enabled, you’ll need the following set of policies to encrypt and decrypt Kinesis messages.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kms:GenerateDataKey",
"kms:Decrypt"
],
"Resource": [
"arn:aws:kms:<region>:<account>:key/<kms-key-uuid>"
]
}
]
}
Sometimes we don’t have the necessary permissions to connect to the real Kinesis and DynamoDB from our developer’s machine. In moments like this, it’s pretty useful to setup Localstack in your project, so you can run everything locally, without having to worry about permissions and enterprise restrictions.
Create a docker-compose.yaml file, in the root of your project, to quickly start localstack
version: '3.5'
services:
localstack:
image: localstack/localstack:2.1.0
environment:
- AWS_DEFAULT_REGION=sa-east-1
- EDGE_PORT=4566
- SERVICES=kinesis, dynamodb
ports:
- '4566:4566'
volumes:
- localstack:/tmp/localstack
- './setup-localstack.sh:/etc/localstack/init/ready.d/setup-localstack.sh' # ready hook
volumes:
localstack:
After that, create a script called setup-localstack.sh, in the root directory, that will contain the script to create the Kinesis Stream, and the 2 DynamoDB Tables
awslocal kinesis create-stream --stream-name my-test-stream --shard-count 1
awslocal dynamodb create-table \
--table-name spring-stream-lock-registry \
--attribute-definitions AttributeName=lockKey,AttributeType=S \
--key-schema AttributeName=lockKey,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--tags Key=Owner,Value=localstack
awslocal dynamodb create-table \
--table-name spring-stream-metadata \
--attribute-definitions AttributeName=metadataKey,AttributeType=S \
--key-schema AttributeName=metadataKey,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--tags Key=Owner,Value=localstack
awslocal dynamodb list-tables
awslocal kinesis list-streams
Since this file is being mapped to the localstack image, the container will automatically run this script the first time you run the container.
To run localstack, just execute
docker-compose up -d
Your local AWS Endpoint is now available at http://localhost:4566
To put records into your test stream, just run
aws --endpoint-url=http://localhost:4566 kinesis put-record --stream-name my-test-stream --partition-key 1 --data <base64-encoded-data>
By default, the Kinesis and DynamoDB Client will try to hit the real AWS Endpoint. To change this behavior, there is just enough to provide these properties for Spring Cloud AWS auto-configuration:
spring.cloud.aws.endpoint spring.cloud.aws.region.static spring.cloud.aws.credentials.access-key spring.cloud.aws.credentials.secret-key
Another way to test against LocalStack is to use Testcontainers
with a localstack/localstack
image container.
The LocalstackContainerTest
interface in this project can be a good sample how to configure container and how to use AWS clients.
Version 2.0 has introduced a KinesisBinderHealthIndicator
implementation which is a part of BindersHealthContributor
composition under the binders
path.
An out-of-the-box implementation iterates over Kinesis streams involved in the binder configuration calling a describeStream
command against them.
If any of streams doesn’t exist the health is treated as DOWN
.
If LimitExceededException
is thrown according describeStream
limitations, the KinesisBinderHealthIndicator
tries over again after one second interval.
Ony when all the configured stream are described properly the UP
health is returned.
You can override out-of-the-box implementation provided your own bean with the kinesisBinderHealthIndicator
name.