A lease is data that defines the binding between a worker and a shard.
Distributed KCL consumer applications use leases to partition data record processing across a fleet of workers.
At any given time, each shard of data records is bound to a particular worker by a lease identified by the leaseKey
variable.
This document describes the lease lifecycle.
Note: shard ids are simplified from shardId-000000000042
to read as shardId[_-]42
for simplicity.
In KCL, a lease provides a temporal assignment between one shard and an assigned worker. Leases are persistent for the duration of shard processing (detailed later). However, the worker that is processing a lease may change since leases may be "stolen" by other workers in the same KCL application.
To persist metadata about lease state (e.g., last read checkpoint, current assigned worker), KCL creates a lease table in DynamoDB. Each KCL application will have its own distinct lease table that includes the application name. More information, including schema, is provided at KCL Lease Table.
Leases are unique to the shard and are not recycled for stream operations (i.e., split, merge). A shard created by stream operations will generate a new lease.
It should be noted that the number of tracked leases may exceed the number of shards. Per the diagram (above), this can occur when there are stream operations propagating through KCL. For example, a 10-shard stream that is split on every shard may temporarily have up-to 30 leases: 10 original + 20 children.
Note: Leases are uniquely identified by their leaseKey
which looks vastly different than lease_X
.
For details on the leaseKey
format, please see KCL LeaseTable.
Leases follow a relatively simple, progressive state machine:
DISCOVERY -> CREATION -> PROCESSING -> SHARD_END -> DELETION
Excluding SHARD_END
, these phases are illustrative of KCL logic and are not explicitly codified.
DISCOVERY
: KCL shard syncing identifies new shards. Discovered shards may result from:- First time starting KCL with an empty lease table.
- Stream operations (i.e., split, merge) that create child shards.
- In multi-stream mode, dynamic discovery of a new stream.
CREATION
: Leases are created 1:1 for each discovered shard.- Leases are only created if they are eligible for processing.
For example, child shards will not have leases created until its parent(s) have reached
SHARD_END
. - Leases are initialized at the configured initial position.
- A notable exception is that child leases are initialized at
TRIM_HORIZON
to avoid processing gaps from their parent lease(s).
- A notable exception is that child leases are initialized at
- Leases are only created if they are eligible for processing.
For example, child shards will not have leases created until its parent(s) have reached
PROCESSING
: Leases are processed, and continually updated with new checkpoints.- In general, leases spend the majority of their life in this state.
SHARD_END
: The associated shard isSHARD_END
and all records have been processed by KCL for the shard.DELETION
: Since there are no more records to process, KCL will delete the lease from the lease table.- Lease deletion will not occur until after its child lease(s) enter
PROCESSING
.- This tombstone helps KCL ensure durability and convergence for all discovered leases.
- For more information, see LeaseCleanupManager#cleanupLeaseForCompletedShard(...)1
- Deletion is configurable, yet recommended to minimize I/O of lease table scans.
- Lease deletion will not occur until after its child lease(s) enter
Shard syncing is a complex responsibility owned by the leader host in a KCL application. By invoking the ListShards API, KCL will identify the shards for the configured stream(s). This process is scheduled at a configurable interval to determine whether a shard sync should be executed to identify new shards. A shard sync is not guaranteed to identify new shards (e.g., KCL has already discovered all existing shards).
The following diagram is an abridged sequence diagram of key classes that initialize the shard sync workflow:
The following diagram outlines the key classes involved in the shard sync workflow:
For more information, here are the links to KCL code:
Scheduler
: implementationLeaseCoordinator
: interface, implementationPeriodicShardSyncManager
: implementationShardSyncTask
: interface, implementationShardDetector
: interface, implementationHierarchicalShardSyncer
: implementationLeaseRefresher
: interface, implementationLeaseSynchronizer
: implementation
Lease creation is a deterministic process. This is illustrative of how KCL operates. Assume a stream has the following shard hierarchy:
Shard structure (each level depicts a stream segment): 0 1 2 3 4 5 - shards till epoch 102 \ / \ / | | 6 7 4 5 - shards from epoch 103 - 205 \ / | / \ 8 4 9 10 - shards from epoch 206+ (still open)
Then NonEmptyLeaseTableSynchronizer
would create leases dependent on the configured initial position.
Assuming leases (4, 5, 7)
already exist, the leases created for an initial position would be:
LATEST
creates(6)
to resolve the gap on-par with epochs 103-205 which is required to eventually reachLATEST
TRIM_HORIZON
creates(0, 1)
to resolve the gap starting from theTRIM_HORIZON
AT_TIMESTAMP(epoch=200)
creates(0, 1)
to resolve the gap leading into epoch 200
To reduce Kinesis Data Streams API calls, KCL will attempt to avoid unnecessary shard syncs. For example, if the discovered shards cover the entire partition range then a shard sync is unlikely to yield a material difference. For more information, see PeriodicShardSyncManager#checkForShardSync(...))1.
KCL balances leases across workers at an interval configured by leaseDuration
and epsilonMillis
.
Lease balancing is done to protect against interruptions in processing should a worker stop updating the lease table (e.g., host failure).
This operation only accounts for lease assignments and does not factor in I/O load.
For example, leases that are equally-distributed across KCL are not guaranteed to have equal I/O distribution.
For more information, here are the links to KCL code:
LeaseCoordinator
: interface, implementationLeaseTaker
: interface, implementationLeaseRefresher
: interface, implementation
Leases are stolen if-and-only-if there were zero expired leases and the looking-to-steal-worker desires more leases. Stolen leases are randomly selected from whichever worker has the most leases. The maximum number of leases to steal on each loop is configured via maxLeasesToStealAtOneTime.
Customers should consider the following trade-offs when configuring the lease-taking cadence:
LeaseRefresher
invokes a DynamoDBscan
against the lease table which has a cost proportional to the number of leases.- Frequent balancing may cause high lease turn-over which incurs DynamoDB
write
costs, and potentially redundant work for stolen leases. - Low
maxLeasesToStealAtOneTime
may increase the time to fully (re)assign leases after an impactful event (e.g., deployment, host failure).
Recommended reading: